(转)Kafka 消费者 Java 实现
轉自:
Kafka 消費者 Java 實現 - 簡書應用程序使用 KafkaConsumer向 Kafka 訂閱 Topic 接收消息,首先理解 Kafka 中消費者(consumer)和消費者組(consumer group...https://www.jianshu.com/p/1f9e18e926f6據原文作者,以下內容總結自 《kafka權威指南》;
應用程序使用 KafkaConsumer向 Kafka 訂閱 Topic 接收消息,首先理解 Kafka 中消費者(consumer)和消費者組(consumer group)的概念和特性。
KafkaConsumer
消費者和消費者組
當生產者向 Topic 寫入消息的速度超過了消費者(consumer)的處理速度,導致大量的消息在 Kafka 中淤積,此時需要對消費者進行橫向伸縮,用多個消費者從同一個主題讀取消息,對消息進行分流。
Kafka 的消費者都屬于消費者組(consumer group)。一個組中的 consumer 訂閱同樣的 topic,每個 consumer 接收 topic 一些分區(partition)中的消息。同一個分區不能被一個組中的多個 consumer 消費。
假設現在有一個 Topic 有4個分區,有一個消費者組訂閱了這個 Topic,隨著組中的消費者數量從1個增加到5個時,Topic 中分區被讀取的情況:
[picture1]
?
Kafka consumers
如果組中 consumer 的數量超過分區數,多出的 consumer 會被閑置。因此,如果想提高消費者的并行處理能力,需要設置足夠多的 partition 數量。
除了通過增加 consumer 來橫向伸縮單個應用程序外,還會出現多個應用程序從同一個 Topic 讀取數據的情況。這也是 Kafka 設計的主要目標之一:讓 Topic 中的數據能夠滿足各種應用場景的需求。
如果要每個應用程序都可以獲取到所有的消息,而不只是其中的一部分,只要保證每個應用程序有自己的 consumer group,就可以獲取到 Topic 所有的消息:
[picture2]
?
Kafka consumer groups
橫向伸縮 Kafka 消費者和消費者群組并不會對性能造成負面影響。
分區再均衡
一個消費者組內的 consumer 共同讀取 Topic 的分區。
分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡(rebalance)。再均衡非常重要,為消費者組帶來了高可用性和伸縮性,可以放心的增加或移除消費者。
再均衡期間,消費者無法讀取消息,造成整個 consumer group 一小段時間的不可用。另外,當分區被重新分配給另一個消費者時,當前的讀取狀態會丟失。
消費者通過向作為組協調器(GroupCoordinator)的 broker(不同的組可以有不同的協調器)發送心跳來維持和群組以及分區的關系。心跳表明消費者在讀取分區里的消息。消費者會在輪詢消息或提交偏移量(offset)時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,組協調器認為消費者已經死亡,會觸發一次再均衡。
在 Kafka 0.10.1 的版本中,對心跳行為進行了修改,由一個獨立的線程負責心跳。
消費 Kafka
創建 Kafka 消費者
在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 對象與創建 KafkaProducer 非常相似,創建 KafkaConsumer 示例:
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092, broker2:9092"); // group.id,指定了消費者所屬群組 props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);訂閱主題
創建了消費者之后,需要訂閱 Topic,subscribe() 方法接受一個主題列表作為參數:
// topic name is “customerCountries” consumer.subscribe(Collections.singletonList("customerCountries"));subscribe() 也可以接收一個正則表達式,匹配多個主題(如果有新的名稱匹配的主題創建,會立即觸發一次再均衡,消費者就可以讀取新添加的主題)。在 Kafka 和其他系統之間復制數據時,使用正則表達式的方式訂閱多個主題是很常見的做法。
// 訂閱所有 test 前綴的 Topic: consumer.subscribe("test.*");消息輪詢
消息輪詢是消費者的核心,通過輪詢向服務器請求數據。消息輪詢 API 會處理所有的細節,包括群組協調、分區再均衡、發送心跳和獲取數據,開發者只需要處理從分區返回的數據。消費者代碼的主要部分如下所示:
try {while (true) {// 100 是超時時間(ms),在該時間內 poll 會等待服務器返回數據ConsumerReccords<String, String> records = consumer.poll(100); // poll 返回一個記錄列表。// 每條記錄都包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。for (ConsumerReccord<String, String> record : records) {log.debug("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());int updatedCount = 1;if (custCountryMap.countainsValue(record.value())) {updatedCount = custCountryMap.get(record.value() ) + 1; custCountryMap.put(record.value(), updatedCount);JSONObject json = new JSONObject(custCountryMap);System.out.println(json.toString());}} } finally {// 關閉消費者,網絡連接和 socket 也會隨之關閉,并立即觸發一次再均衡consumer.close(); }在第一次調用新消費者的 poll() 方法時,會負責查找 GroupCoordinator,然后加入群組,接受分配的分區。如果發生了再均衡,整個過程也是在輪詢期間進行的。心跳也是從輪詢里發送出去的。
消費者配置
Kafka 與消費者相關的配置大部分參數都有合理的默認值,一般不需要修改,不過有一些參數與消費者的性能和可用性有很大關系。接下來介紹這些重要的屬性。
1. fetch.min.bytes
指定消費者從服務器獲取記錄的最小字節數。服務器在收到消費者的數據請求時,如果可用的數據量小于 fetch.min.bytes,那么會等到有足夠的可用數據時才返回給消費者。
合理的設置可以降低消費者和 broker 的工作負載,在 Topic 消息生產不活躍時,減少處理消息次數。如果沒有很多可用數據,但消費者的 CPU 使用率卻很高,需要調高該屬性的值。如果消費者的數量比較多,調高該屬性的值也可以降低 broker 的工作負載。
2. fetch.max.wait.ms
指定在 broker 中的等待時間,默認是500ms。如果沒有足夠的數據流入 Kafka,消費者獲取的數據量的也沒有達到 fetch.min.bytes,最終導致500ms的延遲。
如果要降低潛在的延遲(提高 SLA),可以調低該屬性的值。fetch.max.wait.ms 和 fetch.min.bytes 有一個滿足條件就會返回數據。
3. max.parition.fetch.bytes
指定了服務器從每個分區里返回給消費者的最大字節數,默認值是1MB。也就是說 KafkaConsumer#poll() 方法從每個分區里返回的記錄最多不超過 max.parition.fetch.bytes 指定的字節。
如果一個主題有20個分區和5個消費者(同一個組內),那么每個消費者需要至少4MB 的可用內存(每個消費者讀取4個分區)來接收記錄。如果組內有消費者發生崩潰,剩下的消費者需要處理更多的分區。
max.parition.fetch.bytes 必須比 broker 能夠接收的最大消息的字節數(max.message.size)大,否則消費者可能無法讀取這些消息,導致消費者一直重試。
另一個需要考慮的因素是消費者處理數據的時間。消費者需要頻繁調用 poll() 方法來避免會話過期和發生分區再均衡,如果單次調用 poll() 返回的數據太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.parition.fetch.bytes 值改小或者延長會話過期時間。
4. session.timeout.ms
指定了消費者與服務器斷開連接的最大時間,默認是3s。如果消費者沒有在指定的時間內發送心跳給 GroupCoordinator,就被認為已經死亡,會觸發再均衡,把它的分區分配給其他消費者。
該屬性與 heartbeat.interval.ms 緊密相關,heartbeat.interval.ms 指定了 poll() 方法向協調器發送心跳的頻率,session.timeout.ms 指定了消費者最長多久不發送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一,如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應該是 1s。
調低屬性的值可以更快地檢測和恢復崩潰的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。調高屬性的值,可以減少意外的再均衡,不過檢測節點崩潰需要更長的時間。
5. auto.offset.reset
指定了消費者在讀取一個沒有偏移量(offset)的分區或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時井被刪除)該作何處理,默認值是 latest,表示在 offset 無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)。
另一個值是 earliest,消費者將從起始位置讀取分區的記錄。
6. enable.auto.commit
指定了消費者是否自動提交偏移量,默認值是 true,自動提交。
設為 false 可以程序自己控制何時提交偏移量。如果設為 true,需要通過配置 auto.commit.interval.ms 屬性來控制提交的頻率。
7. partition.assignment.strategy
分區分配給組內消費者的策略,根據給定的消費者和 Topic,決定哪些分區應該被分配給哪個消費者。Kafka 有兩個默認的分配策略:
-
Range,把 Topic 的若干個連續的分區分配給消費者。
假設 consumer1 和 consumer2(c1、c2 代替)訂閱了 topic1 和 topic2(t1、t2 代替),每個 Topic 都有3個分區。那么 c1 可能分配到 t1-part-0、t1-part-1、t2-part-0 和 t2-part1,而 c2 可能分配到 t1-part-2 和 t2-part-2。只要使用了 Range 策略,而且分區數量無法被消費者數量整除,就會出現這種情況。 -
RoundRobin,把所有分區逐個分配給消費者。
上面的例子如果使用 RoundRobin 策略,那么 c1 可能分配到 t1-part-0、t1-part-2 和 t2-part-1,c2 可能分配到 t1-part-1、t2-part-0 和 t2-part-2。一般來說,RoundRobin 策略會給所有消費者分配大致相同的分區數。
默認值是 org.apache.kafka.clients.consumer.RangeAssignor,這個類實現了 Range 策略,org.apache.kafka.clients.consumer.RoundRobinAssignor 是 RoundRobin 策略的實現類。還可以使用自定義策略,屬性值設為自定義類的名字。
8. client.id
broker 用來標識從客戶端發送過來的消息,可以是任意字符串,通常被用在日志、度量指標和配額中。
9. max.poll.records
用于控制單次調用 call() 方法能夠返回的記錄數量,幫助控制在輪詢里需要處理的數據量。
10. receive.buffer.bytes 和 send.buffer.bytes
分別指定了 TCP socket 接收和發送數據包的緩沖區大小。如果設為-1就使用操作系統的默認值。如果生產者或消費者與 broker 處于不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
提交和偏移量
每次調用 poll() 方法,總是返回 Kafka 中還沒有被消費者讀取過的記錄,使用偏移量(offset)來記錄消費者讀取的分區的位置。
更新分區當前位置的操作叫做“提交(commit)”,消費者是如何提交偏移量的呢?
消費者向一個特殊的 Topic:_consumer_offset 發送消息,消息包含每個分區的偏移量。偏移量只有在消費者發生崩潰或者有新的消費者加入群組觸發再均衡時有用。完成再均衡之后,消費者可能分配到新的分區,為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的 offset,然后從 offset 指定的地方繼續處理。
如果提交的 offset 大于客戶端處理的最后一個消息偏移量,那么處于兩個偏移量之間的消息會丟失。反之則會消息重復。
[picture3]
?
消息丟失
[picture4]
?
消息重復
所以,處理偏移量的方式對應用程序會有很大的影響。KafkaConsumer API 提供了多種方式來提交偏移量。
自動提交
最簡單的方式是消費者自動提交偏移量。如果 enable.auto.commit 設為 true,那 么每過一定時間間隔,消費者會自動把從 poll() 方法接收到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是5s。
自動提交是在輪詢里進行的。消費者每次在進行輪詢時會檢查是否需要提交偏移量,如果是,那么會提交從上一次輪詢返回的偏移量。
假設我們使用默認的5s提交時間間隔,在最近一次提交之后的3s發生了再均衡,再 均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后了3s,這3s內的數據已經處理過,再次消費是還會獲取到。通過調低提交時間間隔來更頻繁地提交偏移量,減小可能出現重復消費的時間窗,不過這種情況是無法完全避免的。
在使用自動提交時,每次調用輪詢方法都會把上一次調用返回的偏移量提交上去,并不 知道具體哪些消息已經被處理了,所以在再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(在調用 close() 方法前也會進行自動提交)。
在處理異常或提前退出輪詢時要格外小心。自動提交雖然方便,不過并沒有為開發者留有余地來避免重復處理消息。
提交當前偏移量
KafkaConsumer API 提供的另一種提交偏移量的方式,程序主動觸發提交當前偏移量,而不是基于時間間隔自動提交。
把 auto.commit.offset 設為 false,使用 commitSync() 方法提交偏移量最簡單也最可靠,該方法會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
需要注意,commitSync() 將會提交 poll() 返回的最新偏移量,在處理完所有記錄后調用 commitSync(),否則還是會有丟失消息的風險。
commitSync() 提交偏移量的例子:
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(),record.key(), record.value());// 處理消息的邏輯省略}try {// poll 的數據全部處理完提交consumer.commitSync();} catch (CommitFailedException e) {log.error("commit failed", e)} }只要沒有發生不可恢復的錯誤,commitSync() 會一直嘗試直至提交成功。如果提交 失敗會拋出 CommitFailedException 異常。
異步提交
手動提交有一個不足之處,在 broker 對提交請求作出回應之前,應用程序會阻塞,這會影響應用程序的吞吐量。可以使用異步提交的方式,不等待 broker 的響應。
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(),record.key(), record.value());}// 異步提交consumer.commitAsync(); }在成功提交或發生無法恢復的錯誤之前,commitSync() 會一直嘗試直至提交成功,但是 commitAsync() 不會,這也是該方法的一個問題。之所以不進行重試,是因為在收到服務器響應之前,可能有一個更大的偏移量已經提交成功。
假設我們發出一個請求提交偏移量2000,這個時候發生了短暫的通信問題,服務器收不到請求,與此同時,程序處理了另外一批消息,并成功提交了偏移量3000。如果 commitAsync() 重新嘗試提交偏移量2000,有可能將偏移量3000改為2000,這個時候如果發生再均衡,就會出現重復消息。
commitAsync() 支持回調,在 broker 作出響應時會執行回調。回調經常被用于記錄提交錯誤或生成度量指標,如果要用它來進行重試,一定要注意提交的順序。
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());}consumer.commitAsync(new OffsetCommitCallback() {// 提交完成時回回調此函數public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (e != null)log.error("Commit failed for offsets {}", offsets, e);}}); }重試異步提交
可以使用一個單調遞增的序列號來維護異步提交的順序。在每次提交偏移量之后或在回調里提交偏移量時遞增序列號。在進行重試前,先檢查回調的序列號和即將提交的偏移量是否相等,如果相等,說明沒有新的提交,那么可以安全地進行重試。如果序列號比較大,說明有一個新的提交已經發送出去了,放棄重試。
同步與異步混合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,如果因為臨時網絡問題導致的,那么后續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。
在消費者關閉前一般會組合使用 commitAsync() 和 commitSync():
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());}// 異步提交consumer.commitAsync();} } catch (Exception e) {log.error("Unexpected error", e); } finally {try {// 同步提交consumer.commitSync();} finally {consumer.close();} }在正常處理流程中,使用異步提交來提高性能,最后使用同步提交來保證位移提交成功。
提交特定的偏移量
一般提交偏移量的頻率與處理消息批次的頻率是一樣的。如果想要更頻繁地提交怎么辦?如果 poll() 方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,想要在批次中間提交偏移量該怎么辦?
這種情況無法通過調用 commitSync() 或 commitAsync() 來實現,只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完。
KafkaConsumer API 允許在調用 commitSync() 和 commitAsync() 方法時傳進去希望提交的分區和偏移量的 map。因為消費者可能不只讀取一個分區,需要跟蹤所有分區的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變復雜。
// 記錄分區的 offset 信息 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); int count = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());// 省略消息處理邏輯 ...// 記錄分區的 offsetcurrentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));// 最多每處理 1000 條記錄就提交一次偏移量if (count % 1000 == 0)consumer.commitAsync(currentOffsets, null);count++;} }再均衡監聽器
消費者在退出和進行分區再均衡之前,如果消費者知道要失去對一個分區的所有權,它可能需要提交最后一個已處理記錄的偏移量。KafakConsumer API 可以在消費者新增分區或者失去分區時進行處理,在調用 subscribe() 方法時傳入 ConsumerRebalanceListener 對象,該對象有兩個方法:
-
public void onPartitionRevoked(Collection partitions)
在消費者停止消費消費后,在再均衡開始前調用。 -
public void onPartitionAssigned(Collection partitions)
在分區分配給消費者后,在消費者開始讀取消息前調用。
下面來看一個的例子,在消費者失去某個分區時提交 offset,以便其他消費者可以接著消費消息并處理:
// 記錄分區的 offset 信息 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();class HandleRebalance implements ConsumerRebalanceListener {public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}// 如果發生再均衡,即將失去分區所有權時提交偏移量。// 提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量。public void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);consumer.commitSync(currentOffsets);} }// ...try {// 把 ConsumerRebalanceListener 對象傳給 subscribe() 方法consumer.subscribe(topics, new HandleRebalance());while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));}consumer.commitAsync(currentOffsets, null);} } catch (WakeupException e) {// ignore } catch (Exception e) {log.error("Unexpected error", e); } finally {try {consumer.commitSync(currentOffsets);} finally {consumer.close();} }從指定位移開始消費
除了讀取最近一次提交的位置開始消費數據,有時候也需要從特定的偏移量處開始讀取消息。
如果想從分區起始位置開始消費,可以使用 seekToBeginning(TopicPartition tp);如果想從分區的最末端消費最新的消息,可以使用 seekToEnd(TopicPartition tp)。Kafka 還支持從指定 offset 處開始消費。最典型的一個是:offset 維護在其他系統(例如數據庫)中,并且以其他系統的值為準。
考慮下面的場景:從 Kafka 中讀取消息進行處理,最后把結果寫入數據庫,可能會按如下邏輯處理:
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());processRecord(record);storeRecordInDB(record);consumer.commitAsync(currentOffsets);} }看似正確的邏輯要注意的是,在持久化到數據庫成功后,提交位移到 Kafka 可能會失敗,出現不一致的情況,那么這可能會導致消息會重復處理。對于這種情況,我們需要將持久化到數據庫與提交 offset 實現為原子性操作,最簡單的做法,在保存記錄到數據庫的同時保存 offset 信息,在消費者開始消費時指定數據庫的 offset 開始消費。
只需要通過 seek() 來指定分區位移開始消費即可:
class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {public void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 在分區被回收前提交數據庫事務,保存消費的記錄和位移commitDBTransaction();}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 在開始消費前,從數據庫中獲取分區的位移,使用 seek() 指定開始消費的偏移量for(TopicPartition partition: partitions)consumer.seek(partition, getOffsetFromDB(partition));} }// ...consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); // 調用一次 poll() 方怯,讓消費者加入到消費者群組里,并獲取分配到的分區 consumer.poll(0);// 然后馬上調用 seek() 方法定位分區的偏移量。 // seek() 方法只更新我們正在使用的位置,在下一次調用 poll() 時就可以獲得正確的消息。 // 如果 seek() 發生錯誤, poll() 就會拋出異常。 for (TopicPartition partition: consumer.assignment())consumer.seek(partition, getOffsetFromDB(partition));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {processRecord(record);// 保存記錄結果storeRecordInDB(record);// 保存位移信息storeOffsetInDB(record.topic(), record.partition(), record.offset());}// 提交數據庫事務commitDBTransaction(); }優雅退出
一般情況下,在主線程中循環 poll() 消息并進行處理。當需要退出循環時,使用另一個線程調用 consumer.wakeup(),會使得 poll() 拋出 WakeupException。如果主線程正在處理消息,那么在下一次主線程調用 poll() 時會拋出異常。樣例代碼:
// 注冊 JVM 關閉時的回調,當 JVM 關閉時調用 Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {System.out.println("Starting exit...");// 調用消費者的 wakeup 方法通知主線程退出consumer.wakeup();try {// 等待主線程退出mainThread.join();} catch (InterruptedException e) {e.printStackTrace();}} });...// 消費主線程 try {while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {// ...}consumer.commitSync();} } catch (WakeupException e) {// ignore } finally {consumer.close(); }消息序列化
Kafka 生產者將對象序列化成字節數組并發送到服務器,消費者需要將字節數組轉換成對象(反序列化)。序列化與反序列化需要匹配,與生產者類似,推薦使用 Avro 序列化方式。
使用 Avro 反序列化
樣例代碼如下(與生產者實現類似):
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", schemaUrl); String topic = "customerContacts"KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url)); consumer.subscribe(Collections.singletonList(topic));System.out.println("Reading topic:" + topic);while (true) {// 這里使用之前生產者使用的Avro生成的Customer類ConsumerRecords<String, Customer> records = consumer.poll(1000);for (ConsumerRecord<String, Customer> record: records) {System.out.println("Current customer name is: " + record.value().getName());}consumer.commitSync(); }獨立消費者
一般情況下都是使用消費者組(即使只有一個消費者)來消費消息的,這樣可以在增加或減少消費者時自動進行分區重平衡,這種方式是推薦的。
在知道主題和分區的情況下,也可以使用單個消費者來進行消費,需要實現給消費者分配分區,而不是讓消費者訂閱主題。代碼樣例:
// 獲取主題下所有的分區 List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");if (partitionInfos != null) {for (PartitionInfo partition : partitionInfos)partitions.add(new TopicPartition(partition.topic(), partition.partition()));// 為消費者指定分區consumer.assign(partitions);while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record: records) {// ...}consumer.commitSync();} }除了需要主動獲取分區以及沒有分區重平衡,其他的處理邏輯是一樣的。需要注意的是,如果添加了新的分區,這個消費者是感知不到的,需要通過 consumer.partitionsFor() 來重新獲取分區。
《Kafka權威指南》
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的(转)Kafka 消费者 Java 实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怪物猎人电脑版怎么下载(怪物猎人pc版下
- 下一篇: kafak消费者从头开始消费(消费者组)