深入分析Kafka生产者和消费者
深入Kafka生產(chǎn)者和消費者
- Kafka生產(chǎn)者
- 消息發(fā)送的流程
- 發(fā)送方式
- 發(fā)送并忘記
- 同步發(fā)送
- 異步發(fā)送
- 生產(chǎn)者屬性配置
- 序列化器
- 分區(qū)器
- 自定義分區(qū)器
- Kafka消費者
- 消費者屬性配置
- 消費者基礎概念
- 消費者群組
- 訂閱主題
- 輪詢拉取
- 提交和偏移量
- 提交偏移量帶來的問題
- 自動提交
- 手動提交
- 異步提交
- 同步和異步提交
- 特定提交
- 消費者核心概念
- 群組協(xié)調(diào)
- 分區(qū)再均衡
- 再均衡監(jiān)聽器
- 從特定偏移量處開始記錄
- 優(yōu)雅退出
- 反序列化器
- 獨立消費者
Kafka生產(chǎn)者
消息發(fā)送的流程
生產(chǎn)者每發(fā)送一條消息都需要先創(chuàng)建一個ProducerRecord對象,并且需要指定目標主題、消息內(nèi)容,當然還可以指定消息鍵和分區(qū)。之后就會調(diào)用send()方法發(fā)送該對象,由于生產(chǎn)者需要與Kafka Broker建立網(wǎng)絡傳輸,必然需要先通過序列化器對消息的鍵和值對象序列化成字節(jié)數(shù)組,才能進行傳輸。
之后,分區(qū)器就會接收到數(shù)據(jù),然后先確認ProducerRecord對象中是否指定了分區(qū),如果指定分區(qū)那么就直接把指定的分區(qū)返回,如果未指定分區(qū),分區(qū)器就會根據(jù)消息鍵進行選擇分區(qū)。確認分區(qū)后,那么消息才能確認發(fā)送到哪個主題的哪個分區(qū)上。接下來,消息又會被添加到批次里,同一個批次的消息總是發(fā)送到同一個主題和分區(qū)上,最后生產(chǎn)者端會有一個獨立線程負責將批次發(fā)送到相應的Broker上。
Broker接收到消息后會進行響應,消息寫入成功,會返回生產(chǎn)者端一個RecordMetaData對象,這個對象記錄了消息在哪個主題的分區(qū)上,同時還記錄了消息在分區(qū)中的偏移量。消息如果寫入失敗,則會返回一個錯誤,而生產(chǎn)者會根據(jù)配置的重試次數(shù)進行重試,當超過重試次數(shù)還是失敗,就會將錯誤信息返回給生產(chǎn)者端。
發(fā)送方式
發(fā)送并忘記
producer.send(record);//忽略返回值同步發(fā)送
//接收返回值 Future<RecordMetadata> future = producer.send(record); //調(diào)用get方法進行阻塞,獲取結果 RecordMetadata recordMetadata = future.get();異步發(fā)送
//發(fā)送時,指定Callback回調(diào) producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata,Exception exception) {if(null!=exception){//異常處理}if(null!=metadata){System.out.println("message offset:"+metadata.offset()+" "+"message partition:"+metadata.partition());}} );生產(chǎn)者屬性配置
創(chuàng)建KafkaProducer時都需要為其指定屬性,屬性的配置可以參考org.apache.kafka.clients.producer 包下的 ProducerConfig 類,大部分屬性都配置了合理的默認值,如果對內(nèi)存使用、性能和可靠性方面有要求可以相應調(diào)整一些屬性,下面介紹一些常用的配置屬性:
acks=0:生產(chǎn)者在成功寫入消息之前是不會等待任何的來自服務器的響應。如果在此期間出現(xiàn)了異常,造成Broker沒能收到消息,而此時生產(chǎn)者又得不到反饋,消息也就丟失了。但是因為生產(chǎn)者不需要去等待服務器的響應,吞吐量相對更高;
acks=1:只要集群中分區(qū)的首領節(jié)點接收到消息,生產(chǎn)者就會收到來自服務器的成功響應。如果消息無法到達首領節(jié)點,生產(chǎn)者會收到一個錯誤響應,為了避免數(shù)據(jù)丟失,生產(chǎn)者會重發(fā)消息。不過,如果一個沒有收到消息的節(jié)點成為新首領,消息還是會丟失。缺省使用這個配置;
acks=all:只有當集群中所有的分區(qū)副本都接收到消息后,生產(chǎn)者才會受到一個來自服務器的成功響應。
序列化器
創(chuàng)建KafkaProducer對象時,必須指定鍵和值的序列化器,一些業(yè)務場景可能需要自定義序列化器,那么只需要實現(xiàn)org.apache.kafka.common.serialization.Serializer 接口,重寫serialize()方法定義序列化邏輯即可。但自定義序列化器可能更多會去結合特定業(yè)務場景使用,所以容易導致程序的脆弱性,如果需求做了調(diào)整相應的序列化器實現(xiàn)也可能需要調(diào)整。因此使用序列化器更推薦使用自帶格式描述以及語言無關的序列化框架,比如Kafka 官方推薦的 Apache Avro。
Avro在文件的讀寫是依據(jù)schema而進行的,而schema是通過一個JSON文件進行描述數(shù)據(jù)的,可以把這個schema 內(nèi)嵌在數(shù)據(jù)文件中。這樣,不管數(shù)據(jù)格式如何變動,消費者都知道如何處理數(shù)據(jù)。但是內(nèi)嵌的消息,自帶格式,會導致消息的大小不必要的增大,消耗了資源。我們可以使用 schema 注冊表機制,將所有寫入的數(shù)據(jù)用到的 schema 保存在注冊表中,然后在消息中引用 schema 的標識符,而讀取的數(shù)據(jù)的消費者程序使用這個標識符從注冊表中拉取 schema 來反序列化記錄。
分區(qū)器
生產(chǎn)者在發(fā)送消息時需要創(chuàng)建ProducerRecord對象,ProducerRecord對象可以指定一個消息鍵。指定了消息鍵,那么分區(qū)器就會將擁有相同鍵的消息指定給同一個主題的同一個分區(qū)。如果沒有指定消息鍵,那么會通過默認分區(qū)器,使用輪詢算法將消息均衡發(fā)布到主題下的各個分區(qū)。默認分區(qū)器會對消息鍵進行散列,然后根據(jù)散列值將消息映射到特定的分區(qū)上,這樣同一個消息鍵總是能夠被映射到同一個分區(qū),但是只有不改變主題分區(qū)數(shù)量的情況下,鍵和分區(qū)之間的映射才能保持不變,一旦增加了新的分區(qū),就無法保證了,所以如果要使用鍵來映射分區(qū),那就要在創(chuàng)建主題的時候把分區(qū)規(guī)劃好,不要增加新分區(qū)。
自定義分區(qū)器
一些業(yè)務場景中數(shù)據(jù)可能會有側重,比如按地區(qū)進行劃分數(shù)據(jù)時,不同地區(qū)的消息量是不同的,那么這種情況下就可以根據(jù)消息值中的一些標識,去針對消息值進行做分區(qū),會更適合對應的業(yè)務場景。自定義一個分區(qū)器只需要去實現(xiàn)org.apache.kafka.clients.producer.Partitioner該接口,重寫partition()方法完成相應的分區(qū)邏輯。
Kafka消費者
消費者屬性配置
消費者需要創(chuàng)建KafkaConsumer,創(chuàng)建該對象時也需要指定消費者相關屬性,可以參考org.apache.kafka.clients.consumer 包下 ConsumerConfig 類,大部分屬性都配置了合理的默認值,如果需要關注內(nèi)存使用、性能和可靠性方面可以相應調(diào)整一些屬性,下面介紹一些常用的配置屬性:
消費者基礎概念
消費者群組
在一些高并發(fā)的情況下,當Kafka生產(chǎn)者發(fā)送消息的速度遠快于消費者消費速度時,如果只配置單個消費者,容易造成消息堆積,消息不能及時處理。這種情況下通常考慮的就是對消費者進行橫向伸縮,通過增加消費者個數(shù)對同一個主題多個分區(qū)的消息進行分流。而Kafka中多個消費者通常會構成一個消費者群組,往群組中增加消費者是進行橫向伸縮的主要方式。
在一個消費者群組中所有消費者都是訂閱同一個主題,主題下一個分區(qū)只能由一個消費者消費,而一個消費者可以消費多個分區(qū)。
訂閱主題
//消費者訂閱主題(可以多個),主題值允許使用正則表達式 consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));消費端創(chuàng)建KafkaConsumer對象后,會使用subscribe()方法進行訂閱主題,而一個消費者是可以訂閱多個主題的,該方法可以傳遞一個主題列表或者正則表達式作為參數(shù)。正則表達式也能夠匹配多個主題,比如,想訂閱所有order相關的主題,可以使用subscribe(“order.*”) 。
需要注意: 在通過正則表達式訂閱主題時,如果新建的一個主題正好與表達式匹配,那么會立即觸發(fā)一次再均衡,消費者就可以讀取新添加的主題了。
輪詢拉取
//輪詢獲取消息 while(true){//拉取ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic:%s,分區(qū):%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));} }Kafka消費端是通過拉取的方式獲取消息,消費者為了不斷獲取消息,只能在循環(huán)中不斷調(diào)用poll()方法進行拉取。其中poll()方法需要指定超時時間,它會讓消費者在指定的毫秒數(shù)內(nèi)一直等待 broker 返回數(shù)據(jù)。poll()方法會返回一個ConsumerRecords列表對象,而其中每一個ConsumerRecord對象都包含了消息所屬的主題信息、所在分區(qū)信息、在分區(qū)里的偏移量,以及鍵值對。
提交和偏移量
消費者可以使用 Kafka來追蹤消息在分區(qū)里的位置,稱之為偏移量。消費者更新自己讀取到哪個消息的操作,稱之為提交。消費者提交偏移量本質上就是向一個_consumer_offset 的特殊主題發(fā)送一個消息,里面會包括每個分區(qū)的偏移量。
提交偏移量帶來的問題
如果提交的偏移量小于消費者實際處理的最后一個消息的偏移量,處于兩個偏移量之間的消息會被重復處理。
如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。
自動提交
auto.commit. offset缺省情況下為true,消費者會自動提交偏移量,自動提交存在一個時間間隔由auto.commit.interval.ms進行控制,缺省為5s。自動提交是在輪詢拉取過程中觸發(fā)的,消費者每次輪詢時都會檢查是否提交偏移量,如果是,則會將poll()方法返回的最新偏移量進行提交。
注意:自動提交由于是基于時間間隔的提交,如果在未達到提交時間時觸發(fā)了分區(qū)再均衡,就容易造成在此之前一部分已經(jīng)處理的消息被其它消費者重復處理了。并且自動提交總是將poll()方法返回的最新偏移量進行提交,它并不知道哪些消息處理成功了,所以再次調(diào)用之前最好確保所有當前調(diào)用poll()方法返回的消息都處理完成,否則可能造成消息丟失。
手動提交
將auto.commit. offset設置為false,然后調(diào)用commitsync()方法提交偏移量。這個方法會提交調(diào)用poll()方法返回的最新偏移量,只要沒有發(fā)生不可恢復的錯誤,該方法會一直阻塞,直到提交成功后返回,如果提交失敗就會拋出異常。
注意:手動提交由于也是提交poll()方法返回的最新偏移量,所以在處理完所有的消息后要確保調(diào)用了commitsync()方法,否則可能造成消息丟失。
異步提交
調(diào)用commitAsync()方法進行異步提交,相比與手動提交,它不會使應用程序阻塞,無需等待Broker響應。并且它支持回調(diào),能夠在Broker響應時執(zhí)行相應回調(diào)方法。
//異步提交偏移量 consumer.commitAsync(); //支持回調(diào) consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if(exception!=null){System.out.print("Commmit failed for offsets "+ offsets);}} });同步和異步提交
一般情況下,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續(xù)的提交總會有成功的。但如果這是發(fā)生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。這個時候就需要使用同步異步組合提交。
try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic= %s,partition= %d,offset= %d,key= %s,value= %s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}//每次輪詢進行異步提交consumer.commitAsync();} } finally {try {//同步提交下consumer.commitSync();} finally {consumer.close();} }特定提交
支持在批次中間進行提交偏移量,在調(diào)用 commitsync()和 commitAsync()方法時傳遞希望提交的分區(qū)和偏移量構成的一個Map參數(shù)。
Map<TopicPartition, OffsetAndMetadata> currOffsets= new HashMap<TopicPartition, OffsetAndMetadata>(); int count = 0; try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",record.topic(),record.partition(),record.offset(),record.key(),record.value()));currOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"null")); if(count%10==0){//特定提交,指定一個記錄希望提交的分區(qū)和偏移量的mapconsumer.commitAsync(currOffsets,null);}count++;} } } finally {try {//同步提交consumer.commitSync();} finally {consumer.close();} }消費者核心概念
群組協(xié)調(diào)
消費者要加入群組時,會向群組協(xié)調(diào)器發(fā)送一個 JoinGroup 請求,第一個加入群主的消費者成為群主,群主會獲得群組的成員列表,并負責給每一個消費者分配分區(qū)。分配完畢后,群主把分配情況發(fā)送給群組協(xié)調(diào)器,協(xié)調(diào)器再把這些信息發(fā)送給所有的消費者,每個消費者只能看到自己的分配信息, 只有群主知道群組里所有消費者的分配信息。群組協(xié)調(diào)的工作會在消費者發(fā)生變化,主題中分區(qū)發(fā)生了變化時發(fā)生。
分區(qū)再均衡
在Kafka中,消費者群組中存在著消費者對分區(qū)的所有權關系,這樣在一個群組中如果新增一個消費者,那么新的消費者會分配到原先由其他消費者讀取的分區(qū),而減少一個消費者,那原本由它負責的分區(qū)就會分配給其它消費者。除此之外,如果增加了分區(qū),新增的分區(qū)也需劃分由哪個消費者讀取,這一系列的行為,都會導致分區(qū)所有權的變化,這種變化就稱為分區(qū)再均衡。
在消費者群組中我們介紹了它有一個群組協(xié)調(diào)器,而群組協(xié)調(diào)器它會接收群組中每個消費者發(fā)來的心跳,然后維持每個消費者和群組的從屬關系以及對分區(qū)所有權關系。如果長時間未收到消費者發(fā)送的心跳,群組協(xié)調(diào)器就會認為當前消費者已經(jīng)死亡,就會觸發(fā)一次再均衡。
分區(qū)再均衡在Kafka中是非常重要的,這是消費者群組帶來高可用性和伸縮性的關鍵所在。但是發(fā)生分區(qū)再均衡的期間,消費者會無法接收到消息,會造成整個群組一段時間的不可用,因此都需要盡量減少發(fā)生分區(qū)再均衡。
再均衡監(jiān)聽器
消費者調(diào)用subscribe()訂閱主題時,指定一個ConsumerRebalanceListener,在再均衡開始之前和分區(qū)再均衡完成之后做一些操作。
//指定一個ConsumerRebalancelistener consumer.subscribe(Collections.singletonList("test1"), new ConsumerRebalanceListener() {//分區(qū)再均衡之前@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {//1、將偏移量提交到Kafka//2、偏移量寫入數(shù)據(jù)庫}//分區(qū)再均衡完成以后@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//1、從數(shù)據(jù)庫中獲取偏移量//2、通過seek()方法從指定偏移量位置開始讀取} });從特定偏移量處開始記錄
通常情況下消費者沒有通過seek()方法指定讀取位置時,調(diào)用poll()方法默認都會從分區(qū)的最新偏移量處開始讀取消息。當然如果想從分區(qū)的起始位置開始讀取消息,或者直接跳到分區(qū)的末尾開始讀取消息,可以使 seekToBeginning(Collection tp)和 seekToEnd( Collectiontp)這兩個方法。而調(diào)用seek()是可以從從特定的偏移量處開始讀取消息的。
//從指定分區(qū)中的指定偏移量開始消費 consumer.seek(topicPartition,2);優(yōu)雅退出
如果確定要退出循環(huán),需要通過另一個線程調(diào)用 consumer. wakeup()方法。如果循環(huán)運行在主線程里,可以在 ShutdownHook 里調(diào)用該方法。要記住, consumer. wakeup()是消費者唯一一個可以從其他線程里安全調(diào)用的方法。
反序列化器
創(chuàng)建KafkaConsumer對象時需要指定反序列化器,將從Kafka接收到的字節(jié)數(shù)組轉換成 java對象,發(fā)送消息指定的序列化器必須與接收消息使用的反序列化器一一對應的。一些業(yè)務場景可能需要自定義反序列化器,那么只需要實現(xiàn)org.apache.kafka.common.serialization.Deserializer接口,重寫deserialize()方法定義反序列化邏輯即可。
獨立消費者
一個消費者從一個主題的所有分區(qū)或者某個特定的分區(qū)讀取數(shù)據(jù),不需要消費者群組和再均衡,只需要把主題或者分區(qū)分配給消費者,然后開始讀取消息并提交偏移量。
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //獨立消費者(不需要訂閱主題,只需要分配主題中分區(qū)即可) KafkaConsumer<String,String> consumer= new KafkaConsumer<String, String>(properties); //拿到主題的分區(qū)信息 List<PartitionInfo> partitionInfos = consumer.partitionsFor("independ-consumer"); List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>(); if(null!=partitionInfos){for(PartitionInfo partitionInfo:partitionInfos){topicPartitionList.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));} } //獨立消費者需要執(zhí)行哪些分區(qū)(這里全部的分區(qū)分配給一個消費者) consumer.assign(topicPartitionList); try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("主題:%s,分區(qū):%d,偏移量:%d,key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}} } finally {consumer.close(); }總結
以上是生活随笔為你收集整理的深入分析Kafka生产者和消费者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安恒5月赛BJDCTF3th-逆向
- 下一篇: 谁创造了硅谷?仙童半导体“叛逆八人”