redis和kafka读取代码
kafka讀取代碼如下所示:
<?php
$conf = new RdKafka\Conf();
//設置消費組
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
// 設置offset的存儲為file
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
? ? //參數1表示消費分區,這里是分區0
? ? //參數2表示同步阻塞多久
? ? $message = $topic->consume(0, 12 * 1000);
? ? if (is_null($message)) {
? ? ? ? echo "No more messages\n";
? ? ? ? continue;
? ? }
? ? switch ($message->err) {
? ? ? ? case RD_KAFKA_RESP_ERR_NO_ERROR:
? ? ? ? ? ? var_dump($message);
? ? ? ? ? ? break;
? ? ? ? case RD_KAFKA_RESP_ERR__PARTITION_EOF:
? ? ? ? ? ? echo "No more messages; will wait for more\n";
? ? ? ? ? ? break;
? ? ? ? case RD_KAFKA_RESP_ERR__TIMED_OUT:
? ? ? ? ? ? echo "Timed out\n";
? ? ? ? ? ? break;
? ? ? ? default:
? ? ? ? ? ? throw new \Exception($message->errstr(), $message->err);
? ? ? ? ? ? break;
? ? }
}
?>
redis讀取代碼如下所示:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
for ($i = 0; $i < 20; $i++) {
? ? //$redis->set("huancai . $i","huancai.$i");
? ? $value=$redis->get("huancai . $i");
? ? var_dump($value);
}
?>
?
總結
以上是生活随笔為你收集整理的redis和kafka读取代码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 君生我未生小说
- 下一篇: 重返十七岁综艺节目歌曲(重返十七岁综艺节