临近春招,Kafka是不是忘完了,给你一文概括Kafka
1. Kafka 概述
?
1.1?定義
?
Kakfa 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(message queue),主要應(yīng)用于大數(shù)據(jù)的實(shí)時(shí)處理領(lǐng)域。
?
1.2?消息隊(duì)列
?
1.2.1 傳統(tǒng)消息隊(duì)列與新式消息隊(duì)列模式
?
?
上面是傳統(tǒng)的消息隊(duì)列,比如一個(gè)用戶要注冊信息,當(dāng)用戶信息寫入數(shù)據(jù)庫后,后面還有一些其他流程,比如發(fā)送短信,則需要等這些流程處理完成后,再返回給用戶。而新式隊(duì)列,比如一個(gè)用戶注冊信息,數(shù)據(jù)直接丟進(jìn)數(shù)據(jù)庫,就直接返回給用戶成功。
?
1.2.2?使用消息隊(duì)列的好處
?
-
解耦
-
可恢復(fù)性
-
緩沖
-
靈活性與峰值處理能力
-
異步通信
?
1.2.3?消息隊(duì)列的模式
?
1)?點(diǎn)對點(diǎn)模式
?
消息生產(chǎn)者發(fā)送消息到消息隊(duì)列中,然后消息消費(fèi)者從隊(duì)列中取出并且消費(fèi)消息,消息被消費(fèi)后,隊(duì)列中不在存儲。所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息;隊(duì)列支持存在多個(gè)消費(fèi)者,但是對于一個(gè)消息而言,只會 有一個(gè)消費(fèi)者可以消費(fèi);如果想發(fā)給多個(gè)消費(fèi)者,則需要多次發(fā)送該條消息。
?
2)?發(fā)布/訂閱模式(一對多,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會清除消息)
?
消息生產(chǎn)者將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對點(diǎn)的方式不同,發(fā)布到 topic 的消息會被所有的訂閱者消費(fèi);但是數(shù)據(jù)保留是期限的,默認(rèn)是 7 天,因?yàn)樗皇谴鎯ο到y(tǒng)。Kafka 就是這種模式的。有兩種方式,一種是是消費(fèi)者去主動去消費(fèi)(拉取)消息,而不是生產(chǎn)者推送消息給消費(fèi)者;另外一種就是生產(chǎn)者主動推送消息給消費(fèi)者,類似公眾號。
?
1.3 Kafka 基礎(chǔ)架構(gòu)
?
?
Kafka 的基礎(chǔ)架構(gòu)主要有 broker、生產(chǎn)者、消費(fèi)者組構(gòu)成,當(dāng)前還包括 ZooKeeper。
?
?
生產(chǎn)者負(fù)責(zé)發(fā)送消息,broker?負(fù)責(zé)緩沖消息,broker 中可以創(chuàng)建 topic,每個(gè) topic 又有 partition 和 replication 的概念。
?
消費(fèi)者組負(fù)責(zé)處理消息,同一個(gè)消費(fèi)者組的中消費(fèi)者不能消費(fèi)同一個(gè) partition 中的數(shù)據(jù)。消費(fèi)者組主要是提高消費(fèi)能力,比如之前是一個(gè)消費(fèi)者消費(fèi) 100?條數(shù)據(jù),現(xiàn)在是 2 個(gè)消費(fèi)者消費(fèi) 100 條數(shù)據(jù),可以提高消費(fèi)能力。所以消費(fèi)者組的消費(fèi)者的個(gè)數(shù)要小于 partition 的個(gè)數(shù),不然就會有消費(fèi)者沒有 partition 可以消費(fèi),造成資源的浪費(fèi)。
?
注意:不同消費(fèi)者組的消費(fèi)者是可以消費(fèi)相同的 partition 數(shù)據(jù)。
?
Kakfa?如果要組件集群,則只需要注冊到一個(gè) ZooKeeper?中就可以了,ZooKeeper?中還保留消息消費(fèi)的進(jìn)度或者說偏移量或者消費(fèi)位置。
?
-
0.9?之前的版本偏移量存儲在?ZooKeeper;
-
0.9?之后的版本偏移量存儲在 Kafka中。Kafka 定義了一個(gè)系統(tǒng) topic,專用用來存儲偏移量的數(shù)據(jù)。
?
為什么要改?
?
主要是考慮到頻繁更改偏移量,對?ZooKeeper?的壓力較大,而且 Kafka 本身自己的處理也較復(fù)雜。
?
1.4 安裝 Kafka
?
1) Kafka 的安裝只需要解壓安裝包就可以完成安裝。
?
tar -zxvf kafka_2.11-2.1.1.tgz -C /usr/local/?
2) 查看配置文件。
?
[root@es1 config]# pwd /usr/local/kafka/config [root@es1 config]# ll total 84 -rw-r--r--. 1 root root 906 Feb 8 2019 connect-console-sink.properties -rw-r--r--. 1 root root 909 Feb 8 2019 connect-console-source.properties -rw-r--r--. 1 root root 5321 Feb 8 2019 connect-distributed.properties -rw-r--r--. 1 root root 883 Feb 8 2019 connect-file-sink.properties -rw-r--r--. 1 root root 881 Feb 8 2019 connect-file-source.properties -rw-r--r--. 1 root root 1111 Feb 8 2019 connect-log4j.properties -rw-r--r--. 1 root root 2262 Feb 8 2019 connect-standalone.properties -rw-r--r--. 1 root root 1221 Feb 8 2019 consumer.properties -rw-r--r--. 1 root root 4727 Feb 8 2019 log4j.properties -rw-r--r--. 1 root root 1925 Feb 8 2019 producer.properties -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-1.properties -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-2.properties -rw-r--r--. 1 root root 6873 Jan 16 03:57 server.properties -rw-r--r--. 1 root root 1032 Feb 8 2019 tools-log4j.properties -rw-r--r--. 1 root root 1169 Feb 8 2019 trogdor.conf -rw-r--r--. 1 root root 1023 Feb 8 2019 zookeeper.properties?
3) 修改配置文件 server.properties。
?
設(shè)置 broker.id 這個(gè)是 Kafka 集群區(qū)分每個(gè)節(jié)點(diǎn)的唯一標(biāo)志符。
?
?
4) 設(shè)置 Kafka 的數(shù)據(jù)存儲路徑。
?
?
注意:這個(gè)目錄下不能有其他非 Kafka 目錄,不然會導(dǎo)致 Kafka 集群無法啟動。
?
5) 設(shè)置是否可以刪除 topic,默認(rèn) Kafka 的 topic 是不允許刪除的。
?
?
6) Kafka 的數(shù)據(jù)保留的時(shí)間,默認(rèn)是 7 天。
?
?
7) Log 文件最大的大小,如果 log 文件超過 1 G 會創(chuàng)建一個(gè)新的文件。
?
?
?
8) Kafka 連接的 ZooKeeper 的地址和連接 Kafka 的超時(shí)時(shí)間。
?
?
9) 默認(rèn)的 partition 的個(gè)數(shù)。
?
?
1.5 啟動 Kafka
?
1) 啟動方式一,Kafka 只能單節(jié)點(diǎn)啟動,所以每個(gè) Kakfa 節(jié)點(diǎn)都需要手動啟動,下面的方式阻塞的方式啟動。
?
?
2) 啟動方式二,守護(hù)的方式啟動,推薦使用。
?
?
1.6 Kafka 操作
?
1) 查看當(dāng)前 Kafka 集群已有的 topic。
?
?
注意:這里連接的 ZooKeeper,而不是連接的 Kafka。
?
2) 創(chuàng)建 topic,指定分片和副本個(gè)數(shù)。
?
?
說明:replication-factor?副本數(shù),replication-factor?分區(qū)數(shù),topic?主題名。
?
?
如果當(dāng)前 Kafka 集群只有 3 個(gè) broker 節(jié)點(diǎn),則 replication-factor 最大就是 3 了,下面的例子創(chuàng)建副本為 4,則會報(bào)錯。
?
?
3) 刪除 topic。
?
?
4) 查看 topic 信息。
?
?
?
1.7 啟動生產(chǎn)者生產(chǎn)消息,Kafka 自帶一個(gè)生產(chǎn)者和消費(fèi)者的客戶端
?
1) 啟動一個(gè)生產(chǎn)者,注意此時(shí)連的 9092 端口,連接的 Kafka 集群。
?
?
2) 啟動一個(gè)消費(fèi)者,注意此時(shí)連接的還是 9092 端口,在 0.9 版本之前連接的還是 2181 端口。
?
?
這里我們啟動 2 個(gè)消費(fèi)者來測試一下。
?
?
說明:如果不指定的消費(fèi)者組的配置文件的話,默認(rèn)每個(gè)消費(fèi)者都屬于不同的消費(fèi)者組。
?
3) 發(fā)送消息,可以看到每個(gè)消費(fèi)者都能收到消息。
?
?
?
?
4)Kakfa 中的實(shí)際數(shù)據(jù)。
?
?
?
2. Kafka 架構(gòu)深入
?
?
Kafka 不能保證消息的全局有序,只能保證消息在 partition 內(nèi)有序,因?yàn)橄M(fèi)者消費(fèi)消息是在不同的 partition 中隨機(jī)的。
?
2.1 Kafka 的工作流程
?
Kafka 中的消息是以 topic 進(jìn)行分類的,生產(chǎn)者生成消息、消費(fèi)者消費(fèi)消息都面向 topic。
?
?
Topic 是一個(gè)邏輯上的概念,而 partition 是物理上的概念。每個(gè) partition 又有副本的概念。每個(gè) partition 對應(yīng)于一個(gè) log 文件,該 log 文件中存儲的就是生產(chǎn)者生成的數(shù)據(jù),生產(chǎn)者生成的數(shù)據(jù)會不斷的追加到該 log 的文件末端,且每條數(shù)據(jù)都有自己的 offset,消費(fèi)者都會實(shí)時(shí)記錄自己消費(fèi)到了那個(gè) offset,以便出錯的時(shí)候從上次的位置繼續(xù)消費(fèi),這個(gè) offset 就保存在 index 文件中。Kafka 的 offset 是分區(qū)內(nèi)有序的,但是在不同分區(qū)中是無順序的,Kafka 不保證數(shù)據(jù)的全局有序。
?
2.2 Kafka 原理
?
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件的末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采用分片和索引的機(jī)制,將每個(gè) partition 分為多個(gè) segment,每個(gè) segment 對應(yīng)2個(gè)文件 — index 文件和 log 文件,這 2 個(gè)文件位于一個(gè)相同的文件夾下,文件夾的命名規(guī)則為:topic 名稱 + 分區(qū)序號。
?
?
Index 和 log 的文件的文件名是當(dāng)前這個(gè)索引是最小的數(shù)據(jù)的 offset。Kafka?如何快速的消費(fèi)數(shù)據(jù)呢?
?
?
Index 文件中存儲的數(shù)據(jù)的索引信息,第一列是 offset,第二列這這個(gè)數(shù)據(jù)所對應(yīng)的 log 文件中的偏移量,就像我們?nèi)プx文件,使用 seek() 設(shè)置當(dāng)前鼠標(biāo)的位置一樣,可以更快的找到數(shù)據(jù)。
?
如果要去消費(fèi) offset 為 3 的數(shù)據(jù),首先通過二分法找到數(shù)據(jù)在哪個(gè) index 文件中,然后在通過 index 中 offset 找到數(shù)據(jù)在 log 文件中的 offset;這樣就可以快速的定位到數(shù)據(jù),并消費(fèi)。
?
所以,Kakfa 雖然把數(shù)據(jù)存儲在磁盤中,但是他的讀取速度還是非常快的。
?
3. Kafka 生產(chǎn)者和消費(fèi)者
?
3.1 Kafka 生產(chǎn)者
?
Kafka 的 partition 分區(qū)的作用
?
Kafka 分區(qū)的原因主要就是提供并發(fā)提高性能,因?yàn)樽x寫是 partition 為單位讀寫的;那生產(chǎn)者發(fā)送消息是發(fā)送到哪個(gè)?partition?中呢?
?
在客戶端中指定 partition;
輪詢(推薦)消息1去 p1,消息2去 p2,消息3去 p3,消息4去 p1,消息5去 p2,消息6去 p3……
?
3.2 Kafka 如何保證數(shù)據(jù)可靠性呢?通過 ack 來保證
?
為保證生產(chǎn)者發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的 topic,topic 的每個(gè) partition 收到生產(chǎn)者發(fā)送的數(shù)據(jù)后,都需要向生產(chǎn)者發(fā)送 ack(確認(rèn)收到),如果生產(chǎn)者收到 ack,就會進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。
?
?
?
那么 Kafka 什么時(shí)候向生產(chǎn)者發(fā)送 ack?
?
確保 follower 和 leader 同步完成,leader 在發(fā)送 ack 給生產(chǎn)者,這樣才能確保 leader 掛掉之后,能在 follower 中選舉出新的 leader 后,數(shù)據(jù)不會丟失。
?
那多少個(gè) follower 同步完成后發(fā)送 ack?
?
-
方案1:半數(shù)已經(jīng)完成同步,就發(fā)送 ack;
-
方案2:全部完成同步,才發(fā)送 ack(Kafka采用這種方式)
?
采用第二種方案后,設(shè)想以下場景:leader 收到數(shù)據(jù),所有的 follower 都開始同步數(shù)據(jù),但是有一個(gè) follower 因?yàn)槟撤N故障,一直無法完成同步,那 leader 就要一直等下,直到他同步完成,才能發(fā)送 ack。這樣就非常影響效率,這個(gè)問題怎么解決?
?
?
Leader 維護(hù)了一個(gè)動態(tài)的 ISR 列表(同步副本的作用),只需要這個(gè)列表的中的 follower 和 leader 同步;當(dāng) ISR 中的 follower 完成數(shù)據(jù)的同步之后,leader 就會給生產(chǎn)者發(fā)送 ack,如果 follower 長時(shí)間未向 leader 同步數(shù)據(jù),則該 follower 將被剔除ISR,這個(gè)時(shí)間閾值也是自定義的;同樣 leader 故障后,就會從 ISR 中選舉新的 leader。
?
怎么選擇 ISR 的節(jié)點(diǎn)呢?
?
首先通信的時(shí)間要快,要和 leader 要可以很快的完成通信,這個(gè)時(shí)間默認(rèn)是 10s
然后就看 leader 數(shù)據(jù)差距,消息條數(shù)默認(rèn)是 10000 條(后面版本被移除)
為什么移除:因?yàn)?Kafka 發(fā)送消息是批量發(fā)送的,所以會一瞬間 leader 接受完成,但是 follower 還沒有拉取,所以會頻繁踢出和加入ISR,這個(gè)數(shù)據(jù)會保存到 ZooKeeper 和內(nèi)存中,所以會頻繁更新?ZooKeeper?和內(nèi)存。
?
但是對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的 follower 全部接受成功。所以 Kafka 為用戶提供了三種可靠性級別,用戶可以根據(jù)可靠性和延遲進(jìn)行權(quán)衡,這個(gè)設(shè)置在kafka的生成中設(shè)置:ack 參數(shù)設(shè)置。
?
?
1) acks 為 0
?
生產(chǎn)者不等 ack,只管往 topic 丟數(shù)據(jù)就可以了,這個(gè)丟數(shù)據(jù)的概率非常高。
?
2) ack為 1
?
leader 落盤后就會返回 ack,會有數(shù)據(jù)丟失的現(xiàn)象,如果 leader 在同步完成后出現(xiàn)故障,則會出現(xiàn)數(shù)據(jù)丟失。
?
3) ack為-1(all)
?
leader 和 follower(ISR)落盤才會返回 ack,會有數(shù)據(jù)重復(fù)現(xiàn)象,如果在 leader 已經(jīng)寫完成,且 follower 同步完成,但是在返回ack的出現(xiàn)故障,則會出現(xiàn)數(shù)據(jù)重復(fù)現(xiàn)象;極限情況下,這個(gè)也會有數(shù)據(jù)丟失的情況,比如 follower 和 leader 通信都很慢,所以 ISR 中只有一個(gè) leader 節(jié)點(diǎn),這個(gè)時(shí)候,leader 完成落盤,就會返回 ack,如果此時(shí) leader 故障后,就會導(dǎo)致丟失數(shù)據(jù)。
?
3.3 Kafka 如何保證消費(fèi)數(shù)據(jù)的一致性?通過HW來保證
?
?
-
LEO:指每個(gè) follower 的最大的 offset;
-
HW(高水位):指消費(fèi)者能見到的最大的 offset,LSR 隊(duì)列中最小的 LEO,也就是說消費(fèi)者只能看到1~6的數(shù)據(jù),后面的數(shù)據(jù)看不到,也消費(fèi)不了。
?
避免 leader 掛掉后,比如當(dāng)前消費(fèi)者消費(fèi)8這條數(shù)據(jù)后,leader 掛了,此時(shí)比如 f2 成為 leader,f2 根本就沒有9這條數(shù)據(jù),那么消費(fèi)者就會報(bào)錯,所以設(shè)計(jì)了 HW 這個(gè)參數(shù),只暴露最少的數(shù)據(jù)給消費(fèi)者,避免上面的問題。
?
3.3.1 HW 保證數(shù)據(jù)存儲的一致性
?
1) follower 故障
?
follower 發(fā)生故障后會被臨時(shí)提出 LSR,待該 follower 恢復(fù)后,follower 會讀取本地的磁盤記錄的上次的 HW,并將該 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進(jìn)行同步,等該 follower 的 LEO 大于等于該 partition 的 hw,即 follower 追上leader后,就可以重新加入 LSR。
?
2) leader 故障
?
leader 發(fā)生故障后,會從 ISR 中選出一個(gè)新的 leader,之后,為了保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 follower 會先將各自的 log 文件高于 hw 的部分截掉(新 leader 自己不會截掉),然后從新的 leader 同步數(shù)據(jù)。
?
注意:這個(gè)是為了保證多個(gè)副本間的數(shù)據(jù)存儲的一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
?
3.3.2 精準(zhǔn)一次(冪等性),保證數(shù)據(jù)不重復(fù)
?
-
ack 設(shè)置為 -1,則可以保證數(shù)據(jù)不丟失,但是會出現(xiàn)數(shù)據(jù)重復(fù)(at least once)
-
ack 設(shè)置為 0,則可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失(at most once)
?
但是如果魚和熊掌兼得,該怎么辦?這個(gè)時(shí)候就就引入了 Exact once(精準(zhǔn)一次)。
?
在 0.11 版本后,引入冪等性解決 Kakfa 集群內(nèi)部的數(shù)據(jù)重復(fù),在 0.11 版本之前,在消費(fèi)者處自己做處理。如果啟用了冪等性,則 ack 默認(rèn)就是-1,Kafka 就會為每個(gè)生產(chǎn)者分配一個(gè) pid,并未每條消息分配 seqnumber,如果 pid、partition、seqnumber 三者一樣,則 Kafka 認(rèn)為是重復(fù)數(shù)據(jù),就不會落盤保存;但是如果生產(chǎn)者掛掉后,也會出現(xiàn)有數(shù)據(jù)重復(fù)的現(xiàn)象;所以冪等性解決在單次會話的單個(gè)分區(qū)的數(shù)據(jù)重復(fù),但是在分區(qū)間或者跨會話的是數(shù)據(jù)重復(fù)的是無法解決的。
?
?
3.4 Kafka 消費(fèi)者
?
3.4.1 消費(fèi)方式
?
消息隊(duì)列有兩種消費(fèi)消息的方式,push(微信公眾號)、pull(kafka)。push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄M(fèi)發(fā)送速率是由 broker 決定的,他的目標(biāo)是盡可能以最快的的速度傳遞消息,但是這樣很容易造成消費(fèi)者來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 的方式可以消費(fèi)者的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
?
pull 模式的不足之處是如果 Kafka 沒有數(shù)據(jù),消費(fèi)者可能會陷入死循環(huán),一直返回空數(shù)據(jù),針對這一點(diǎn),Kafka 消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)候回傳遞一個(gè) timeout 參數(shù),如果當(dāng)時(shí)沒有數(shù)據(jù)可供消費(fèi),消費(fèi)者會等待一段時(shí)間在返回。
?
3.4.2 分區(qū)分配策略
?
一個(gè)消費(fèi)者組有多個(gè)消費(fèi)者,一個(gè) topic 有多個(gè) partition。所以必然會涉及到 partition 的分配問題,即確定哪個(gè) partition 由哪個(gè)消費(fèi)者來消費(fèi)。Kafka 提供兩種方式,一種是輪詢(RountRobin)對于 topic 組生效,一種是(Range)對于單個(gè)topic生效
?
輪詢:前置條件是需要一個(gè)消費(fèi)者里的消費(fèi)者訂閱的是相同的 topic。不然就會出現(xiàn)問題;非默認(rèn)的的方式。
?
同一個(gè)消費(fèi)者組里的消費(fèi)者不能同時(shí)消費(fèi)同一個(gè)分區(qū),比如三個(gè)消費(fèi)者消費(fèi)一個(gè) topic 的 9 個(gè)分區(qū)。
?
?
?
如果一個(gè)消費(fèi)者組里有2個(gè)消費(fèi)者,這個(gè)消費(fèi)者組里同時(shí)消費(fèi) 2 個(gè) topic,每個(gè) topic 又有三個(gè) partition。首先會把 2 個(gè) topic 當(dāng)做一個(gè)主題,然后根據(jù) topic 和 partition 做 hash,然后在按照 hash 排序。然后輪詢分配給一個(gè)消費(fèi)者組中的 2 個(gè)消費(fèi)者。
?
?
如果是下面這樣的方式訂閱的呢?
?
比如有 3 個(gè) topic,每個(gè) topic 有 3 個(gè) partition,一個(gè)消費(fèi)者組中有 2 個(gè)消費(fèi)者。消費(fèi)者1訂閱 topic1 和 topic2,消費(fèi)者2訂閱 topic2 和 topic3。那么這樣的場景,使用輪訓(xùn)的方式訂閱 topic 就會有問題。
?
如果是下面這種方式訂閱呢?
?
比如有2個(gè) topic,每個(gè) topic 有3個(gè) partition,一個(gè)消費(fèi)者組 有2個(gè)消費(fèi)者,消費(fèi)者1訂閱 topic1,消費(fèi)者2訂閱 topic2,這樣使用輪訓(xùn)的方式訂閱 topic 也會有問題。
?
所以我們一直強(qiáng)調(diào),使用輪訓(xùn)的方式訂閱 topic 的前提是一個(gè)消費(fèi)者組中的所有消費(fèi)者訂閱的主題是一樣的;所以輪詢的方式不是 Kafka 默認(rèn)的方式;Range?是按照單個(gè) topic 來劃分的,默認(rèn)的分配方式。
?
?
?
?
Range 的問題會出現(xiàn)消費(fèi)者數(shù)據(jù)不均衡的問題。比如下面的例子,一個(gè)消費(fèi)者組訂閱了 2 個(gè) topic,就會出現(xiàn)消費(fèi)者1消費(fèi) 4 個(gè) partition,而另外一個(gè)消費(fèi)者只消費(fèi) 2 個(gè) partition。
?
?
分區(qū)策略什么時(shí)候會觸發(fā)呢?當(dāng)消費(fèi)者組里的消費(fèi)者個(gè)數(shù)變化的時(shí)候,會觸發(fā)分區(qū)策略調(diào)整,比如消費(fèi)者里增加消費(fèi)者,或者減少消費(fèi)者。
?
3.4.3?維護(hù)?offset
?
由于消費(fèi)者在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,消費(fèi)者恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi),所以消費(fèi)者需要實(shí)施記錄自己消費(fèi)哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
?
Offset保存的位置有2個(gè),一個(gè) ZooKeeper,一個(gè)是 Kafka。首先看下 offset 保存到?ZooKeeper,由消費(fèi)者組、topic、partition 三個(gè)元素確定唯一的 offset。
?
?
所以消費(fèi)者組中的某個(gè)消費(fèi)者掛掉之后,或者的消費(fèi)者還是可以拿到這個(gè) offset。
?
?
Controller 這個(gè)節(jié)點(diǎn)和?ZooKeeper?通信,同步數(shù)據(jù),這個(gè)節(jié)點(diǎn)就是誰先起來,誰就先注冊 controller,誰就是 controller。其他節(jié)點(diǎn)和 controller 信息保持同步。
?
3.4.5 消費(fèi)者組的案例
?
修改消費(fèi)者組 id
?
?
啟動一個(gè)消費(fèi)者發(fā)送 3 條數(shù)據(jù)。
?
?
指定消費(fèi)者組啟動消費(fèi)者,啟動三個(gè)消費(fèi)者,可以看到每個(gè)消費(fèi)者消費(fèi)了一條數(shù)據(jù)。
?
?
?
?
在演示下不同組可以消費(fèi)同一個(gè) topic 的,我們看到 2 個(gè)消費(fèi)者的消費(fèi)者都消費(fèi)到同一條數(shù)據(jù)。再次啟動一個(gè)消費(fèi)者,這個(gè)消費(fèi)者屬于另外一個(gè)消費(fèi)者組。
?
?
?
4?Kafka 的高效讀寫機(jī)制
?
4.1 分布式部署
?
多節(jié)點(diǎn)并行操作。
?
4.2、順序?qū)懘疟P
?
Kafka 的 producer 生產(chǎn)數(shù)據(jù),要寫入到 log 文件中,寫的過程中一直追加到文件末尾,為順序?qū)?#xff0c;官網(wǎng)有數(shù)據(jù)表明。同樣的磁盤,順序?qū)懩艿?600M/S,而隨機(jī)寫只有 100K/S。這與磁盤的機(jī)械結(jié)構(gòu)有關(guān),順序?qū)懼钥?#xff0c;是因?yàn)槠涫∪チ舜罅看蓬^尋址的時(shí)間。
?
4.3、零復(fù)制技術(shù)
?
正常情況下,先把數(shù)據(jù)讀到內(nèi)核空間,在從內(nèi)核空間把數(shù)據(jù)讀到用戶空間,然后在調(diào)操作系統(tǒng)的 IO 接口寫到內(nèi)核空間,最終在寫到硬盤中。
?
?
Kafka 是這樣做的,直接在內(nèi)核空間流轉(zhuǎn) IO 流,所以 Kafka 的性能非常高。
?
?
?
5. ZooKeeper 在 Kafka 中的作用
?
Kafka 集群中有一個(gè) broker 會被選舉為 controller,負(fù)責(zé)管理集群 broker 的上下線,所有的 topic 的分區(qū)副本分配和 leader 選舉等工作。
總結(jié)
以上是生活随笔為你收集整理的临近春招,Kafka是不是忘完了,给你一文概括Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: There is no configur
- 下一篇: 来个硬货——长文解读:基于业务场景的My