Kafka消费者APi
Kafka客戶端從集群中消費(fèi)消息,并透明地處理kafka集群中出現(xiàn)故障服務(wù)器,透明地調(diào)節(jié)適應(yīng)集群中變化的數(shù)據(jù)分區(qū)。也和服務(wù)器交互,平衡均衡消費(fèi)者。
public class KafkaConsumer<K,V> extends Object implements Consumer<K,V>消費(fèi)者TCP長(zhǎng)連接到broker來(lái)拉取消息。故障導(dǎo)致的消費(fèi)者關(guān)閉失敗,將會(huì)泄露這些連接,消費(fèi)者不是線程安全的,可以查看更多關(guān)于Multi-threaded(多線程)處理的細(xì)節(jié)。
跨版本兼容性
該客戶端可以與0.10.0或更新版本的broker集群進(jìn)行通信。較早的版本可能不支持某些功能。例如,0.10.0broker不支持offsetsForTimes,因?yàn)榇斯δ苁窃诎姹?.10.1中添加的。 如果你調(diào)用broker版本不可用的API時(shí),將報(bào) UnsupportedVersionException 異常。
偏移量和消費(fèi)者的位置
kafka為分區(qū)中的每條消息保存一個(gè)偏移量(offset),這個(gè)偏移量是該分區(qū)中一條消息的唯一標(biāo)示符。也表示消費(fèi)者在分區(qū)的位置。例如,一個(gè)位置是5的消費(fèi)者(說(shuō)明已經(jīng)消費(fèi)了0到4的消息),下一個(gè)接收消息的偏移量為5的消息。實(shí)際上有兩個(gè)與消費(fèi)者相關(guān)的“位置”概念:
消費(fèi)者的位置給出了下一條記錄的偏移量。它比消費(fèi)者在該分區(qū)中看到的最大偏移量要大一個(gè)。 它在每次消費(fèi)者在調(diào)用poll(long)中接收消息時(shí)自動(dòng)增長(zhǎng)。
“已提交”的位置是已安全保存的最后偏移量,如果進(jìn)程失敗或重新啟動(dòng)時(shí),消費(fèi)者將恢復(fù)到這個(gè)偏移量。消費(fèi)者可以選擇定期自動(dòng)提交偏移量,也可以選擇通過(guò)調(diào)用commit API來(lái)手動(dòng)的控制(如:commitSync 和 commitAsync)。
這個(gè)區(qū)別是消費(fèi)者來(lái)控制一條消息什么時(shí)候才被認(rèn)為是已被消費(fèi)的,控制權(quán)在消費(fèi)者,下面我們進(jìn)一步更詳細(xì)地討論。
消費(fèi)者組和主題訂閱
Kafka的消費(fèi)者組概念,通過(guò)進(jìn)程池瓜分消息并處理消息。這些進(jìn)程可以在同一臺(tái)機(jī)器運(yùn)行,也可分布到多臺(tái)機(jī)器上,以增加可擴(kuò)展性和容錯(cuò)性,相同group.id的消費(fèi)者將視為同一個(gè)消費(fèi)者組。
分組中的每個(gè)消費(fèi)者都通過(guò)subscribe API動(dòng)態(tài)的訂閱一個(gè)topic列表。kafka將已訂閱topic的消息發(fā)送到每個(gè)消費(fèi)者組中。并通過(guò)平衡分區(qū)在消費(fèi)者分組中所有成員之間來(lái)達(dá)到平均。因此每個(gè)分區(qū)恰好地分配1個(gè)消費(fèi)者(一個(gè)消費(fèi)者組中)。所有如果一個(gè)topic有4個(gè)分區(qū),并且一個(gè)消費(fèi)者分組有只有2個(gè)消費(fèi)者。那么每個(gè)消費(fèi)者將消費(fèi)2個(gè)分區(qū)。
消費(fèi)者組的成員是動(dòng)態(tài)維護(hù)的:如果一個(gè)消費(fèi)者故障。分配給它的分區(qū)將重新分配給同一個(gè)分組中其他的消費(fèi)者。同樣的,如果一個(gè)新的消費(fèi)者加入到分組,將從現(xiàn)有消費(fèi)者中移一個(gè)給它。這被稱為重新平衡分組,并在下面更詳細(xì)地討論。當(dāng)新分區(qū)添加到訂閱的topic時(shí),或者當(dāng)創(chuàng)建與訂閱的正則表達(dá)式匹配的新topic時(shí),也將重新平衡。將通過(guò)定時(shí)刷新自動(dòng)發(fā)現(xiàn)新的分區(qū),并將其分配給分組的成員。
從概念上講,你可以將消費(fèi)者分組看作是由多個(gè)進(jìn)程組成的單一邏輯訂閱者。作為一個(gè)多訂閱系統(tǒng),Kafka支持對(duì)于給定topic任何數(shù)量的消費(fèi)者組,而不重復(fù)。
這是在消息系統(tǒng)中常見的功能的略微概括。所有進(jìn)程都將是單個(gè)消費(fèi)者分組的一部分(類似傳統(tǒng)消息傳遞系統(tǒng)中的隊(duì)列的語(yǔ)義),因此消息傳遞就像隊(duì)列一樣,在組中平衡。與傳統(tǒng)的消息系統(tǒng)不同的是,雖然,你可以有多個(gè)這樣的組。但每個(gè)進(jìn)程都有自己的消費(fèi)者組(類似于傳統(tǒng)消息系統(tǒng)中pub-sub的語(yǔ)義),因此每個(gè)進(jìn)程都會(huì)訂閱到該主題的所有消息。
此外,當(dāng)分組重新分配自動(dòng)發(fā)生時(shí),可以通過(guò)ConsumerRebalanceListener通知消費(fèi)者,這允許他們完成必要的應(yīng)用程序級(jí)邏輯,例如狀態(tài)清除,手動(dòng)偏移提交等。有關(guān)更多詳細(xì)信息,請(qǐng)參閱Kafka存儲(chǔ)的偏移。
它也允許消費(fèi)者通過(guò)使用assign(Collection)手動(dòng)分配指定分區(qū),如果使用手動(dòng)指定分配分區(qū),那么動(dòng)態(tài)分區(qū)分配和協(xié)調(diào)消費(fèi)者組將失效。
發(fā)現(xiàn)消費(fèi)者故障
訂閱一組topic后,當(dāng)調(diào)用poll(long)時(shí),消費(fèi)者將自動(dòng)加入到組中。只要持續(xù)的調(diào)用poll,消費(fèi)者將一直保持可用,并繼續(xù)從分配的分區(qū)中接收消息。此外,消費(fèi)者向服務(wù)器定時(shí)發(fā)送心跳。 如果消費(fèi)者崩潰或無(wú)法在session.timeout.ms配置的時(shí)間內(nèi)發(fā)送心跳,則消費(fèi)者將被視為死亡,并且其分區(qū)將被重新分配。
還有一種可能,消費(fèi)可能遇到“活鎖”的情況,它持續(xù)的發(fā)送心跳,但是沒(méi)有處理。為了預(yù)防消費(fèi)者在這種情況下一直持有分區(qū),我們使用max.poll.interval.ms活躍檢測(cè)機(jī)制。 在此基礎(chǔ)上,如果你調(diào)用的poll的頻率大于最大間隔,則客戶端將主動(dòng)地離開組,以便其他消費(fèi)者接管該分區(qū)。 發(fā)生這種情況時(shí),你會(huì)看到offset提交失敗(調(diào)用commitSync()引發(fā)的CommitFailedException)。這是一種安全機(jī)制,保障只有活動(dòng)成員能夠提交offset。所以要留在組中,你必須持續(xù)調(diào)用poll。
消費(fèi)者提供兩個(gè)配置設(shè)置來(lái)控制poll循環(huán):
max.poll.interval.ms:增大poll的間隔,可以為消費(fèi)者提供更多的時(shí)間去處理返回的消息(調(diào)用poll(long)返回的消息,通常返回的消息都是一批)。缺點(diǎn)是此值越大將會(huì)延遲組重新平衡。
max.poll.records:此設(shè)置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次poll間隔要處理的最大值。通過(guò)調(diào)整此值,可以減少poll間隔,減少重新平衡分組的
對(duì)于消息處理時(shí)間不可預(yù)測(cè)地的情況,這些選項(xiàng)是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個(gè)線程中,讓消費(fèi)者繼續(xù)調(diào)用poll。 但是必須注意確保已提交的offset不超過(guò)實(shí)際位置。另外,你必須禁用自動(dòng)提交,并只有在線程完成處理后才為記錄手動(dòng)提交偏移量(取決于你)。 還要注意,你需要pause暫停分區(qū),不會(huì)從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創(chuàng)建新線程將導(dǎo)致你機(jī)器內(nèi)存溢出)。
示例
這個(gè)消費(fèi)者API提供了靈活性,以涵蓋各種消費(fèi)場(chǎng)景,下面是一些例子來(lái)演示如何使用它們。
自動(dòng)提交偏移量
這是個(gè)【自動(dòng)提交偏移量】的簡(jiǎn)單的kafka消費(fèi)者API。
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }設(shè)置enable.auto.commit,偏移量由auto.commit.interval.ms控制自動(dòng)提交的頻率。
集群是通過(guò)配置bootstrap.servers指定一個(gè)或多個(gè)broker。不用指定全部的broker,它將自動(dòng)發(fā)現(xiàn)集群中的其余的borker(最好指定多個(gè),萬(wàn)一有服務(wù)器故障)。
在這個(gè)例子中,客戶端訂閱了主題foo和bar。消費(fèi)者組叫test。
broker通過(guò)心跳機(jī)器自動(dòng)檢測(cè)test組中失敗的進(jìn)程,消費(fèi)者會(huì)自動(dòng)ping集群,告訴進(jìn)群它還活著。只要消費(fèi)者能夠做到這一點(diǎn),它就被認(rèn)為是活著的,并保留分配給它分區(qū)的權(quán)利,如果它停止心跳的時(shí)間超過(guò)session.timeout.ms,那么就會(huì)認(rèn)為是故障的,它的分區(qū)將被分配到別的進(jìn)程。
這個(gè)deserializer設(shè)置如何把byte轉(zhuǎn)成object類型,例子中,通過(guò)指定string解析器,我們告訴獲取到的消息的key和value只是簡(jiǎn)單個(gè)string類型。
手動(dòng)控制偏移量
不需要定時(shí)的提交offset,可以自己控制offset,當(dāng)消息認(rèn)為已消費(fèi)過(guò)了,這個(gè)時(shí)候再去提交它們的偏移量。這個(gè)很有用的,當(dāng)消費(fèi)的消息結(jié)合了一些處理邏輯,這個(gè)消息就不應(yīng)該認(rèn)為是已經(jīng)消費(fèi)的,直到它完成了整個(gè)處理。
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }在這個(gè)例子中,我們將消費(fèi)一批消息并將它們存儲(chǔ)在內(nèi)存中。當(dāng)我們積累足夠多的消息后,我們?cè)賹⑺鼈兣坎迦氲綌?shù)據(jù)庫(kù)中。如果我們?cè)O(shè)置offset自動(dòng)提交(之前說(shuō)的例子),消費(fèi)將被認(rèn)為是已消費(fèi)的。這樣會(huì)出現(xiàn)問(wèn)題,我們的進(jìn)程可能在批處理記錄之后,但在它們被插入到數(shù)據(jù)庫(kù)之前失敗了。
為了避免這種情況,我們將在相應(yīng)的記錄插入數(shù)據(jù)庫(kù)之后再手動(dòng)提交偏移量。這樣我們可以準(zhǔn)確控制消息是成功消費(fèi)的。提出一個(gè)相反的可能性:在插入數(shù)據(jù)庫(kù)之后,但是在提交之前,這個(gè)過(guò)程可能會(huì)失敗(即使這可能只是幾毫秒,這是一種可能性)。在這種情況下,進(jìn)程將獲取到已提交的偏移量,并會(huì)重復(fù)插入的最后一批數(shù)據(jù)。這種方式就是所謂的“至少一次”保證,在故障情況下,可以重復(fù)。
如果您無(wú)法執(zhí)行這些操作,可能會(huì)使已提交的偏移超過(guò)消耗的位置,從而導(dǎo)致缺少記錄。 使用手動(dòng)偏移控制的優(yōu)點(diǎn)是,您可以直接控制記錄何時(shí)被視為“已消耗”。
注意:使用自動(dòng)提交也可以“至少一次”。但是要求你必須下次調(diào)用poll(long)之前或關(guān)閉消費(fèi)者之前,處理完所有返回的數(shù)據(jù)。如果操作失敗,這將會(huì)導(dǎo)致已提交的offset超過(guò)消費(fèi)的位置,從而導(dǎo)致丟失消息。使用手動(dòng)控制offset的有點(diǎn)是,你可以直接控制消息何時(shí)提交。、
上面的例子使用commitSync表示所有收到的消息為”已提交",在某些情況下,你可以希望更精細(xì)的控制,通過(guò)指定一個(gè)明確消息的偏移量為“已提交”。在下面,我們的例子中,我們處理完每個(gè)分區(qū)中的消息后,提交偏移量。
try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }注意:已提交的offset應(yīng)始終是你的程序?qū)⒆x取的下一條消息的offset。因此,調(diào)用commitSync(offsets)時(shí),你應(yīng)該加1個(gè)到最后處理的消息的offset。
訂閱指定的分區(qū)
在前面的例子中,我們訂閱我們感興趣的topic,讓kafka提供給我們平分后的topic分區(qū)。但是,在有些情況下,你可能需要自己來(lái)控制分配指定分區(qū),例如:
-
如果這個(gè)消費(fèi)者進(jìn)程與該分區(qū)保存了某種本地狀態(tài)(如本地磁盤的鍵值存儲(chǔ)),則它應(yīng)該只能獲取這個(gè)分區(qū)的消息。
-
如果消費(fèi)者進(jìn)程本身具有高可用性,并且如果它失敗,會(huì)自動(dòng)重新啟動(dòng)(可能使用集群管理框架如YARN,Mesos,或者AWS設(shè)施,或作為一個(gè)流處理框架的一部分)。 在這種情況下,不需要Kafka檢測(cè)故障,重新分配分區(qū),因?yàn)橄M(fèi)者進(jìn)程將在另一臺(tái)機(jī)器上重新啟動(dòng)。
要使用此模式,,你只需調(diào)用assign(Collection)消費(fèi)指定的分區(qū)即可:
String topic = "foo";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));一旦手動(dòng)分配分區(qū),你可以在循環(huán)中調(diào)用poll(跟前面的例子一樣)。消費(fèi)者分組仍需要提交offset,只是現(xiàn)在分區(qū)的設(shè)置只能通過(guò)調(diào)用assign修改,因?yàn)槭謩?dòng)分配不會(huì)進(jìn)行分組協(xié)調(diào),因此消費(fèi)者故障不會(huì)引發(fā)分區(qū)重新平衡。每一個(gè)消費(fèi)者是獨(dú)立工作的(即使和其他的消費(fèi)者共享GroupId)。為了避免offset提交沖突,通常你需要確認(rèn)每一個(gè)consumer實(shí)例的gorupId都是唯一的。
注意,手動(dòng)分配分區(qū)(即,assgin)和動(dòng)態(tài)分區(qū)分配的訂閱topic模式(即,subcribe)不能混合使用。
offset存儲(chǔ)在其他地方
消費(fèi)者可以不使用kafka內(nèi)置的offset倉(cāng)庫(kù)。可以選擇自己來(lái)存儲(chǔ)offset。要注意的是,將消費(fèi)的offset和結(jié)果存儲(chǔ)在同一個(gè)的系統(tǒng)中,用原子的方式存儲(chǔ)結(jié)果和offset,但這不能保證原子,要想消費(fèi)是完全原子的,并提供的“正好一次”的消費(fèi)保證比kafka默認(rèn)的“至少一次”的語(yǔ)義要更高。你需要使用kafka的offset提交功能。
這有結(jié)合的例子。
-
如果消費(fèi)的結(jié)果存儲(chǔ)在關(guān)系數(shù)據(jù)庫(kù)中,存儲(chǔ)在數(shù)據(jù)庫(kù)的offset,讓提交結(jié)果和offset在單個(gè)事務(wù)中。這樣,事物成功,則offset存儲(chǔ)和更新。如果offset沒(méi)有存儲(chǔ),那么偏移量也不會(huì)被更新。
-
如果offset和消費(fèi)結(jié)果存儲(chǔ)在本地倉(cāng)庫(kù)。例如,可以通過(guò)訂閱一個(gè)指定的分區(qū)并將offset和索引數(shù)據(jù)一起存儲(chǔ)來(lái)構(gòu)建一個(gè)搜索索引。如果這是以原子的方式做的,常見的可能是,即使崩潰引起未同步的數(shù)據(jù)丟失。索引程序從它確保沒(méi)有更新丟失的地方恢復(fù),而僅僅丟失最近更新的消息。
每個(gè)消息都有自己的offset,所以要管理自己的偏移,你只需要做到以下幾點(diǎn):
-
配置 enable.auto.commit=false
-
使用提供的 ConsumerRecord 來(lái)保存你的位置。
-
在重啟時(shí)用 seek(TopicPartition, long) 恢復(fù)消費(fèi)者的位置。
當(dāng)分區(qū)分配也是手動(dòng)完成的(像上文搜索索引的情況),這種類型的使用是最簡(jiǎn)單的。 如果分區(qū)分配是自動(dòng)完成的,需要特別小心處理分區(qū)分配變更的情況。可以通過(guò)調(diào)用subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener實(shí)例來(lái)完成的。例如,當(dāng)分區(qū)向消費(fèi)者獲取時(shí),消費(fèi)者將通過(guò)實(shí)現(xiàn)ConsumerRebalanceListener.onPartitionsRevoked(Collection)來(lái)給這些分區(qū)提交它們offset。當(dāng)分區(qū)分配給消費(fèi)者時(shí),消費(fèi)者通過(guò)ConsumerRebalanceListener.onPartitionsAssigned(Collection)為新的分區(qū)正確地將消費(fèi)者初始化到該位置。
ConsumerRebalanceListener的另一個(gè)常見用法是清除應(yīng)用已移動(dòng)到其他位置的分區(qū)的緩存。
控制消費(fèi)的位置
大多數(shù)情況下,消費(fèi)者只是簡(jiǎn)單的從頭到尾的消費(fèi)消息,周期性的提交位置(自動(dòng)或手動(dòng))。kafka也支持消費(fèi)者去手動(dòng)的控制消費(fèi)的位置,可以消費(fèi)之前的消息也可以跳過(guò)最近的消息。
有幾種情況,手動(dòng)控制消費(fèi)者的位置可能是有用的。
一種場(chǎng)景是對(duì)于時(shí)間敏感的消費(fèi)者處理程序,對(duì)足夠落后的消費(fèi)者,直接跳過(guò),從最近的消費(fèi)開始消費(fèi)。
另一個(gè)使用場(chǎng)景是本地狀態(tài)存儲(chǔ)系統(tǒng)(上一節(jié)說(shuō)的)。在這樣的系統(tǒng)中,消費(fèi)者將要在啟動(dòng)時(shí)初始化它的位置(無(wú)論本地存儲(chǔ)是否包含)。同樣,如果本地狀態(tài)已被破壞(假設(shè)因?yàn)榇疟P丟失),則可以通過(guò)重新消費(fèi)所有數(shù)據(jù)并重新創(chuàng)建狀態(tài)(假設(shè)kafka保留了足夠的歷史)在新的機(jī)器上重新創(chuàng)建。
kafka使用seek(TopicPartition, long)指定新的消費(fèi)位置。用于查找服務(wù)器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。
消費(fèi)者流量控制
如果消費(fèi)者分配了多個(gè)分區(qū),并同時(shí)消費(fèi)所有的分區(qū),這些分區(qū)具有相同的優(yōu)先級(jí)。在一些情況下,消費(fèi)者需要首先消費(fèi)一些指定的分區(qū),當(dāng)指定的分區(qū)有少量或者已經(jīng)沒(méi)有可消費(fèi)的數(shù)據(jù)時(shí),則開始消費(fèi)其他分區(qū)。
例如流處理,當(dāng)處理器從2個(gè)topic獲取消息并把這兩個(gè)topic的消息合并,當(dāng)其中一個(gè)topic長(zhǎng)時(shí)間落后另一個(gè),則暫停消費(fèi),以便落后的趕上來(lái)。
kafka支持動(dòng)態(tài)控制消費(fèi)流量,分別在future的poll(long)中使用pause(Collection) 和 resume(Collection) 來(lái)暫停消費(fèi)指定分配的分區(qū),重新開始消費(fèi)指定暫停的分區(qū)。
多線程處理
Kafka消費(fèi)者不是線程安全的。所有網(wǎng)絡(luò)I/O都發(fā)生在進(jìn)行調(diào)用應(yīng)用程序的線程中。用戶的責(zé)任是確保多線程訪問(wèn)正確同步的。非同步訪問(wèn)將導(dǎo)致ConcurrentModificationException。
此規(guī)則唯一的例外是wakeup(),它可以安全地從外部線程來(lái)中斷活動(dòng)操作。在這種情況下,將從操作的線程阻塞并拋出一個(gè)WakeupException。這可用于從其他線程來(lái)關(guān)閉消費(fèi)者。 以下代碼段顯示了典型模式:
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }在單獨(dú)的線程中,可以通過(guò)設(shè)置關(guān)閉標(biāo)志和喚醒消費(fèi)者來(lái)關(guān)閉消費(fèi)者。
closed.set(true);consumer.wakeup();我們沒(méi)有多線程模型的例子。但留下幾個(gè)操作可用來(lái)實(shí)現(xiàn)多線程處理消息。
每個(gè)線程一個(gè)消費(fèi)者
每個(gè)線程自己的消費(fèi)者實(shí)例。這里是這種方法的優(yōu)點(diǎn)和缺點(diǎn):
- PRO: 這是最容易實(shí)現(xiàn)的
- PRO: 因?yàn)樗恍枰诰€程之間協(xié)調(diào),所以通常它是最快的。
- PRO: 它按順序處理每個(gè)分區(qū)(每個(gè)線程只處理它接受的消息)。
- CON: 更多的消費(fèi)者意味著更多的TCP連接到集群(每個(gè)線程一個(gè))。一般kafka處理連接非常的快,所以這是一個(gè)小成本。
- CON: 更多的消費(fèi)者意味著更多的請(qǐng)求被發(fā)送到服務(wù)器,但稍微較少的數(shù)據(jù)批次可能導(dǎo)致I/O吞吐量的一些下降。
- CON: 所有進(jìn)程中的線程總數(shù)受到分區(qū)總數(shù)的限制。
解耦消費(fèi)和處理
另一個(gè)替代方式是一個(gè)或多個(gè)消費(fèi)者線程,它來(lái)消費(fèi)所有數(shù)據(jù),其消費(fèi)所有數(shù)據(jù)并將ConsumerRecords實(shí)例切換到由實(shí)際處理記錄處理的處理器線程池來(lái)消費(fèi)的阻塞隊(duì)列。這個(gè)選項(xiàng)同樣有利弊:
- PRO: 可擴(kuò)展消費(fèi)者和處理進(jìn)程的數(shù)量。這樣單個(gè)消費(fèi)者的數(shù)據(jù)可分給多個(gè)處理器線程來(lái)執(zhí)行,避免對(duì)分區(qū)的任何限制。
- CON: 跨多個(gè)處理器的順序保證需要特別注意,因?yàn)榫€程是獨(dú)立的執(zhí)行,后來(lái)的消息可能比遭到的消息先處理,這僅僅是因?yàn)榫€程執(zhí)行的運(yùn)氣。如果對(duì)排序沒(méi)有問(wèn)題,這就不是個(gè)問(wèn)題。
- CON: 手動(dòng)提交變得更困難,因?yàn)樗枰獏f(xié)調(diào)所有的線程以確保處理對(duì)該分區(qū)的處理完成。
這種方法有多種玩法,例如,每個(gè)處理線程可以有自己的隊(duì)列,消費(fèi)者線程可以使用TopicPartitionhash到這些隊(duì)列中,以確保按順序消費(fèi),并且提交也將簡(jiǎn)化。
作者:半獸人
鏈接:http://orchome.com/451
來(lái)源:OrcHome
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
轉(zhuǎn)載于:https://www.cnblogs.com/Llh-Forerer2015/p/9668060.html
總結(jié)
以上是生活随笔為你收集整理的Kafka消费者APi的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 浅析Block的内部结构 , 及分析其是
- 下一篇: 对比let、const、var的异同