kafka0.8消费者实例
生活随笔
收集整理的這篇文章主要介紹了
kafka0.8消费者实例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
序
這里簡單展示一下如何使用kafka0.8的client去消費一個topic。
maven
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.8.2.2</version></dependency>初始化客戶端
Properties props = new Properties();props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest");props.put("group.id",group);props.put("zookeeper.session.timeout.ms", "10000");props.put("zookeeper.sync.time.ms", "2000");props.put("auto.commit.interval.ms", "10000");props.put("consumer.timeout.ms","10000"); //設置ConsumerIterator的hasNext的超時時間,不設置則永遠阻塞直到有新消息來props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, consumerCount);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);并發消費
consumerMap.get(topic).stream().forEach(stream -> {pool.submit(new Runnable() {@Overridepublic void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();//it.hasNext()取決于consumer.timeout.ms的值,默認為-1try{while (it.hasNext()) {System.out.println(Thread.currentThread().getName()+" hello");//是hasNext拋出異常,而不是next拋出System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));}}catch (ConsumerTimeoutException e){e.printStackTrace();}System.out.println(Thread.currentThread().getName()+" end");}});});注意事項
消費者實例數*每個實例的消費線程數 <= topic的partition數量,否則多余的就浪費了。
轉載于:https://my.oschina.net/go4it/blog/1544496
總結
以上是生活随笔為你收集整理的kafka0.8消费者实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 车损险要不要买 车损险能够买吗
- 下一篇: 完全虚拟化和半虚拟化区别