kafka 发布订阅_在Kafka中发布订阅模型
kafka 發(fā)布訂閱
這是第四個(gè)柱中的一系列關(guān)于同步客戶端集成與異步系統(tǒng)( 1, 2, 3 )。 在這里,我們將嘗試了解Kafka的工作方式,以便正確利用其發(fā)布-訂閱實(shí)現(xiàn)。
卡夫卡概念
根據(jù)官方文件 :
Kafka是一種分布式的,分區(qū)的,復(fù)制的提交日志服務(wù)。 它提供消息傳遞系統(tǒng)的功能,但具有獨(dú)特的設(shè)計(jì)。
Kafka作為集群運(yùn)行,這些節(jié)點(diǎn)稱為代理。 代理可以是領(lǐng)導(dǎo)者或副本,以提供高可用性和容錯(cuò)能力。 代理負(fù)責(zé)分區(qū),分區(qū)是存儲(chǔ)消息的分發(fā)單元。 這些消息是有序的,可以通過名為offset的索引進(jìn)行訪問。 一組分區(qū)構(gòu)成一個(gè)主題,是消息的提要。 分區(qū)可以具有不同的使用者,并且它們使用自己的偏移量訪問消息。 生產(chǎn)者將消息發(fā)布到Kafka主題中。 Kafka文檔中的以下圖表可以幫助您理解這一點(diǎn):
排隊(duì)與發(fā)布-訂閱
消費(fèi)者群體是另一個(gè)關(guān)鍵概念,有助于解釋為什么Kafka比RabbitMQ等其他消息傳遞解決方案更靈活,功能更強(qiáng)大。 消費(fèi)者與消費(fèi)者群體相關(guān)聯(lián)。 如果每個(gè)使用者都屬于同一個(gè)使用者組,則主題的消息將在各個(gè)使用者之間平均負(fù)載均衡; 這就是所謂的“排隊(duì)模型”。 相反,如果每個(gè)使用者都屬于不同的使用者組,則所有消息都將在每個(gè)客戶端中使用。 這就是所謂的“發(fā)布-訂閱”模型。
您可以混合使用這兩種方法,分別針對(duì)不同的需求使用不同的邏輯使用者組,并在每個(gè)組中有多個(gè)使用者以通過并行提高吞吐量。 同樣, Kafka文檔中的另一個(gè)圖表:
了解我們的需求
正如我們在以前的文章(見1, 2, 3 )該項(xiàng)目服務(wù)發(fā)布消息到卡夫卡的話題叫item_deleted 。 此消息將位于該主題的一個(gè)分區(qū)中。 為了定義消息將駐留在哪個(gè)分區(qū),Kafka提供了三種選擇 :
- 如果記錄中指定了分區(qū),請(qǐng)使用它
- 如果未指定分區(qū)但存在密鑰,則根據(jù)密鑰的哈希值選擇一個(gè)分區(qū)
- 如果不存在分區(qū)或密鑰,則以循環(huán)方式選擇一個(gè)分區(qū)
我們將使用item_id作為密鑰。 執(zhí)法服務(wù)的不同實(shí)例中包含的消費(fèi)者僅對(duì)特定分區(qū)感興趣,因?yàn)樗麄儽A裟承╉?xiàng)目的內(nèi)部狀態(tài)。 讓我們檢查不同的Kafka使用者實(shí)現(xiàn),以了解哪種使用最方便。
卡夫卡消費(fèi)者
卡夫卡共有三個(gè)消費(fèi)者: 高級(jí)消費(fèi)者 , 簡單消費(fèi)者和新消費(fèi)者
在這三個(gè)消費(fèi)者中, 簡單消費(fèi)者在最低級(jí)別上運(yùn)行。 它滿足我們的要求,因?yàn)樗试S消費(fèi)者“在流程中僅使用主題中分區(qū)的子集”。 但是,如文檔所述:
SimpleConsumer確實(shí)需要使用者組中不需要的大量工作:
- 您必須跟蹤應(yīng)用程序中的偏移量,才能知道從何處停止消費(fèi)
- 您必須確定哪個(gè)Broker是主題和分區(qū)的主要Broker。
- 您必須處理經(jīng)紀(jì)人負(fù)責(zé)人變更
如果您閱讀了建議的用于處理這些問題的代碼,則將不鼓勵(lì)您使用此使用者。
新使用者提供正確的抽象級(jí)別,并允許我們訂閱特定的分區(qū)。 他們在文檔中建議以下用例:
第一種情況是,如果進(jìn)程正在維護(hù)與該分區(qū)相關(guān)聯(lián)的某種本地狀態(tài)(例如本地磁盤上的鍵值存儲(chǔ)),因此該進(jìn)程應(yīng)僅獲取其在磁盤上維護(hù)的分區(qū)的記錄。
不幸的是,我們的系統(tǒng)使用的是Kafka 0.8,而該使用者僅從0.9開始可用。 我們沒有足夠的資源來遷移到該版本,因此我們需要堅(jiān)持使用高級(jí)消費(fèi)者 。
該使用者提供了一個(gè)不錯(cuò)的API,但不允許我們訂閱特定的分區(qū)。 這意味著,執(zhí)法服務(wù)的每個(gè)實(shí)例都將使用每條消息,甚至是無關(guān)的消息。 我們可以通過為每個(gè)實(shí)例定義不同的消費(fèi)者組來實(shí)現(xiàn)這一目標(biāo)。
利用Akka Event Bus
在上一篇文章中,我們定義了一些等待ItemDeleted消息的有限狀態(tài)機(jī)ItemDeleted 。
when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}我們的卡夫卡消費(fèi)者可以將所有消息轉(zhuǎn)發(fā)給那些演員,并讓他們丟棄/過濾不相關(guān)的物品。 但是,我們不想讓演員浪費(fèi)很多多余的工作,因此我們將添加一層抽象,讓他們以真正有效的方式丟棄適當(dāng)?shù)南ⅰ?
final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b) }Akka Event Bus按分區(qū)為我們提供訂閱,而我們的Kafka高級(jí)消費(fèi)者中缺少該分區(qū)。 我們將從卡夫卡消費(fèi)者處發(fā)布每條消息到公交車上:
itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))在上一篇文章中,我們展示了如何使用該分區(qū)鍵訂閱消息:
itemDeletedBus.subscribe(self, item.partitionKey)LookupClassification將過濾不需要的消息,因此我們的參與者不會(huì)過載。
摘要
得益于Kafka提供的靈活性,我們能夠設(shè)計(jì)我們的系統(tǒng)以了解不同的折衷方案。 在接下來的文章中,我們將看到如何協(xié)調(diào)這些FSM的結(jié)果以向客戶端提供同步響應(yīng)。
第一部分 | 第2部分 | 第三部分
翻譯自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html
kafka 發(fā)布訂閱
總結(jié)
以上是生活随笔為你收集整理的kafka 发布订阅_在Kafka中发布订阅模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑加速cpu(电脑加速器上外网)
- 下一篇: 电脑qq分身版(qq分身版)