kafak消费者从头开始消费(消费者组)
生活随笔
收集整理的這篇文章主要介紹了
kafak消费者从头开始消费(消费者组)
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
【README】
本文主要用于描述 kafka 消費(fèi)者如何從頭開(kāi)始消費(fèi);
【1】從頭開(kāi)始消費(fèi)
1)從頭開(kāi)始消費(fèi),需要滿(mǎn)足兩個(gè)條件, 如下:
- 條件1, 使用一個(gè)全新的消費(fèi)者組id;
- 條件2,指定 auto.offset.reset 為 earliest ;
2)代碼如下:
public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2開(kāi)啟自動(dòng)提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan2"); // group.id /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("third", "second"));/* 循環(huán)拉取 */ int i =0;while(true) {if (i++ > 10) break; // 只消費(fèi)10條數(shù)據(jù) /* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] " + rd.key() + "--" + rd.value()); }} /* 關(guān)閉消費(fèi)者 */ consumer.close(); }總結(jié)
以上是生活随笔為你收集整理的kafak消费者从头开始消费(消费者组)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: (转)Kafka 消费者 Java 实现
- 下一篇: Win11怎么投影到电脑?Win11开启