Linux环境Kafka安装配置
Linux環(huán)境Kafka安裝配置
1. 認識Kafa
(1) Kafa介紹
- 開源消息系統(tǒng)
- 官網:kafka.apache.org/
- 用途:在流式計算中,Kafka一般用來緩存數(shù)據(jù),Storm通過消費Kafka的數(shù)據(jù)進行計算。
- Apache Kafka是一個開源消息系統(tǒng),由Scala寫成。是由Apache軟件基金會開發(fā)的一個開源消息系統(tǒng)項目。Kafka最初是由LinkedIn公司開發(fā),并于2011年初開源。2012年10月從Apache Incubator畢業(yè)。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高通量、低等待的平臺。Kafka是一個分布式消息隊列。Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統(tǒng)可用性。
(2) 消息隊列內部實現(xiàn)原理
- <1>. 點對點模式(類似接受文件,一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除) -點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監(jiān)聽者也是如此。
- <2>. 發(fā)布/訂閱模式(類似公眾號,一對多,數(shù)據(jù)生產后,推送給所有訂閱者)
- 發(fā)布訂閱模型則是一個基于推送的消息傳送模型。發(fā)布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監(jiān)聽主題時才接收消息,而持久訂閱者則監(jiān)聽主題的所有消息,即使當前訂閱者不可用,處于離線狀態(tài)。
(3) 為什么需要消息隊列
- <1>. 解耦:
- 允許獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
- <2>. 冗余:
- 消息隊列把數(shù)據(jù)進行持久化直到它們已經被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
- <3>. 擴展性:
- 因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
- <4>. 靈活性 & 峰值處理能力:
- 在訪問量劇增的情況下,應用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。
- <5>. 可恢復性:
- 系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復后被處理。
- <6>. 順序保證:
- 在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)
- <7>. 緩沖:
- 有助于控制和優(yōu)化數(shù)據(jù)流經過系統(tǒng)的速度,解決生產消息和消費消息的處理速度不一致的情況。
- <8>. 異步通信:
- 很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
(4) Kafka架構
- **<1>. Producer:**消息生產者,就是向kafka broker發(fā)消息的客戶端。
- **<2>. Consumer:**消息消費者,向kafka broker取 消息的客戶端
- **<3>. Topic:**可以理解為一個隊列。
- **<4>. Consumer Group(CG):**消費者組,kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的所有消費者協(xié)調在一起來消費訂閱主題(subscribed topics)的所有分區(qū)(partition)。當然,每個分區(qū)只能由同一個消費組內的一個consumer來消費。
- **<5>. Broker:**一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
- **<6>. Partition:**為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
- **<7>. Offset:**kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
(5) 分布式模型
- Kafka每個主題的多個分區(qū)日志分布式地存儲在Kafka集群上,同時為了故障容錯,每個分區(qū)都會以副本的方式復制到多個消息代理節(jié)點上。其中一個節(jié)點會作為主副本(Leader),其他節(jié)點作為備份副本(Follower,也叫作從副本)。主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步數(shù)據(jù)。當主副本出現(xiàn)故障時,備份副本中的一個副本會被選擇為新的主副本。因為每個分區(qū)的副本中只有主副本接受讀寫,所以每個服務器端都會作為某些分區(qū)的主副本,以及另外一些分區(qū)的備份副本,這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的。
- Kafka的生產者和消費者相對于服務器端而言都是客戶端。Kafka生產者客戶端發(fā)布消息到服務端的指定主題,會指定消息所屬的分區(qū)。生產者發(fā)布消息時根據(jù)消息是否有鍵,采用不同的分區(qū)策略。消息沒有鍵時,通過輪詢方式進行客戶端負載均衡;消息有鍵時,根據(jù)分區(qū)語義(例如hash)確保相同鍵的消息總是發(fā)送到同一分區(qū)。
- Kafka的消費者通過訂閱主題來消費消息,并且每個消費者都會設置一個消費組名稱。因為生產者發(fā)布到主題的每一條消息都只會發(fā)送給消費者組的一個消費者。所以,如果要實現(xiàn)傳統(tǒng)消息系統(tǒng)的**“隊列”模型**,可以讓每個消費者都擁有相同的消費組名稱,這樣消息就會負責均衡到所有的消費者;如果要實現(xiàn)**“發(fā)布-訂閱”模型**,則每個消費者的消費者組名稱都不相同,這樣每條消息就會廣播給所有的消費者。
- 分區(qū)是消費者現(xiàn)場模型的最小并行單位。如下圖(圖1)所示,生產者發(fā)布消息到一臺服務器的3個分區(qū)時,只有一個消費者消費所有的3個分區(qū)。在下圖(圖2)中,3個分區(qū)分布在3臺服務器上,同時有3個消費者分別消費不同的分區(qū)。假設每個服務器的吞吐量時300MB,在下圖(圖1)中分攤到每個分區(qū)只有100MB,而在下圖(圖2)中,集群整體的吞吐量有900MB。可以看到,增加服務器節(jié)點會提升集群的性能,增加消費者數(shù)量會提升處理性能。
- 同一個消費組下多個消費者互相協(xié)調消費工作,Kafka會將所有的分區(qū)平均地分配給所有的消費者實例,這樣每個消費者都可以分配到數(shù)量均等的分區(qū)。Kafka的消費組管理協(xié)議會動態(tài)地維護消費組的成員列表,當一個新消費者加入消費者組,或者有消費者離開消費組,都會觸發(fā)再平衡操作。
- Kafka的消費者消費消息時,只保證在一個分區(qū)內的消息的完全有序性,并不保證同一個主題匯中多個分區(qū)的消息順序。而且,消費者讀取一個分區(qū)消息的順序和生產者寫入到這個分區(qū)的順序是一致的。比如,生產者寫入“hello”和“Kafka”兩條消息到分區(qū)P1,則消費者讀取到的順序也一定是“hello”和“Kafka”。如果業(yè)務上需要保證所有消息完全一致,只能通過設置一個分區(qū)完成,但這種做法的缺點是最多只能有一個消費者進行消費。
- 一般來說,只需要保證每個分區(qū)的有序性,再對消息鍵(message Key可以是user id等)來保證相同鍵的所有消息落入同一分區(qū),就可以滿足絕大多數(shù)的應用。
- kafka讀寫流程圖:
3. Kafka-2.11安裝流程
(1) 準備工作
- ZooKeeper集群環(huán)境
(2) 解壓kafka_2.11-2.1.1.tgz安裝包到目標目錄下:
- tar -zxvf .tgz -C 目標目錄
(3) 為后續(xù)方便,重命名Kafka文件夾:
- mv kafka_2.11-2.1.1/ kafka_2.11
(4) 在/opt/module/kafka目錄下創(chuàng)建logs文件夾
- mkdir logs
(5) 修改配置文件
- cd config/
- vi server.properties
- #broker的全局唯一編號,不能重復 broker.id=0 #是否允許刪除topic delete.topic.enable=true #處理網絡請求的線程數(shù)量 num.network.threads=3 #用來處理磁盤IO的線程數(shù)量 num.io.threads=8 #發(fā)送套接字的緩沖區(qū)大小 socket.send.buffer.bytes=102400 #接收套接字的緩沖區(qū)大小 socket.receive.buffer.bytes=102400 #請求套接字的最大緩沖區(qū)大小 socket.request.max.bytes=104857600 #kafka運行日志存放的路徑 log.dirs=/opt/module/kafka/logs #topic在當前broker上的分區(qū)個數(shù) num.partitions=1 #用來恢復和清理data下數(shù)據(jù)的線程數(shù)量 num.recovery.threads.per.data.dir=1 #segment文件保留的最長時間,超時將被刪除 log.retention.hours=168 #配置連接Zookeeper集群地址 zookeeper.connect=XXXX:2181,XXXX:2181,XXXX:2181 復制代碼
- <1>. Broker配置信息
- <2>. Producer配置信息
- <3>.Consumer配置信息
(6) 配置環(huán)境變量:
- 修改配置文件:
- vi /etc/profile
- 增加以下內容:
- export KAFKA_HOME=kafka安裝路徑
- export PATH=$PATH:$KAFKA_HOME/bin
- 聲明環(huán)境變量:
- source /etc/profile
(7) 集群配置:
- 拷貝配置好的kafka到其他機器上
- scp -r kafka_2.11/ bigdata02:$PWD
- scp -r kafka_2.11/ bigdata03:$PWD
- scp -r kafka_2.11/ bigdata02:$PWD
- 修改配置信息broker.id (注:broker.id不得重復)
- 配置相應環(huán)境變量
(8) 啟動集群
- 依次在bigdata11、bigdata12、bigdata13節(jié)點上啟動kafka(加上&是在后臺啟動)
- kafka-server-start.sh config/server.properties &
(9) 關閉集群
- kafka-server-stop.sh stop
4. Kafka命令行操作
(1) 查看當前服務器中的所有topic
- kafka-topics.sh --zookeeper XXXX:2181 --list
(2) 創(chuàng)建topic
- kafka-topics.sh --zookeeper XXXX:2181 --create --replication-factor 3 --partitions 1 --topic 名稱
- 選項說明:
- --topic 定義topic名
- --replication-factor 定義副本數(shù)
- --partitions 定義分區(qū)數(shù)
(3) 刪除topic
- kafka-topics.sh --zookeeper XXXX:2181 --delete --topic first
- 需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。
(4) 發(fā)送消息
- kafka-console-producer.sh --broker-list XXXX:9092 --topic first
(5) 消費消息
- kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
- --from-beginning:會把first主題中以往所有的數(shù)據(jù)都讀取出來。根據(jù)業(yè)務場景選擇是否增加該配置。
(6) 查看某個Topic的詳情
- kafka-topics.sh --zookeeper XXXX:2181 --describe --topic first
5. Kafka工作流程分析
(1) Kafka生產過程分析
-
<1>. 寫入方式
- producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)。
-
<2>. 分區(qū)(Partition)
-
Kafka集群有多個消息代理服務器(broker-server)組成,發(fā)布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應用產生不同類型的數(shù)據(jù),可以設置不同的主題。一個主題一般會有多個消息的訂閱者,當生產者發(fā)布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。
-
Kafka集群為每個主題維護了分布式的分區(qū)(partition)日志文件,物理意義上可以把主題(topic)看作進行了分區(qū)的日志文件(partition log)。主題的每個分區(qū)都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區(qū)中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,叫做偏移量(offset),這個偏移量能夠唯一地定位當前分區(qū)中的每一條消息。
-
消息發(fā)送時都被發(fā)送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結構如下圖所示:
-
下圖中的topic有3個分區(qū),每個分區(qū)的偏移量都從0開始,不同分區(qū)之間的偏移量都是獨立的,不會相互影響。
-
可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
-
發(fā)布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區(qū)后,都會分配到一個自增的偏移量。**原始的消息內容和分配的偏移量以及其他一些元數(shù)據(jù)信息最后都會存儲到分區(qū)日志文件中。**消息的鍵也可以不用設置,這種情況下消息會均衡地分布到不同的分區(qū)。
-
分區(qū)的原因:
- a. 方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數(shù)據(jù);
- b. 可以提高并發(fā),因為可以以Partition為單位讀寫。
- 傳統(tǒng)消息系統(tǒng)在服務端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發(fā)送給消費者。**但由于消息是異步發(fā)送給消費者的,消息到達消費者的順序可能是無序的,這就意味著在并行消費時,傳統(tǒng)消息系統(tǒng)無法很好地保證消息被順序處理。**雖然我們可以設置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執(zhí)行。
- Kafka比傳統(tǒng)消息系統(tǒng)有更強的順序性保證,它使用主題的分區(qū)作為消息處理的并行單元。**Kafka以分區(qū)作為最小的粒度,將每個分區(qū)分配給消費者組中不同的而且是唯一的消費者,并確保一個分區(qū)只屬于一個消費者,即這個消費者就是這個分區(qū)的唯一讀取線程。那么,只要分區(qū)的消息是有序的,消費者處理的消息順序就有保證。**每個主題有多個分區(qū),不同的消費者處理不同的分區(qū),所以Kafka不僅保證了消息的有序性,也做到了消費者的負載均衡。
-
分區(qū)的原則:
- a. 指定了patition,則直接使用;
- b. 未指定patition但指定key,通過對key的value進行hash出一個patition
- c. patition和key都未指定,使用輪詢選出一個patition。
-
-
<3>. 副本(Replication) 同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費,同時producer也不能再將數(shù)據(jù)存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication作為follower從leader 中復制數(shù)據(jù)。
-
<4>. 寫入流程
- producer寫入消息流程如下:
- a. producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
- b. producer將消息發(fā)送給該leader
- c. leader將消息寫入本地log
- d. followers從leader pull消息,寫入本地log后向leader發(fā)送ACK
- a. leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK
- producer寫入消息流程如下:
(2) Broker 保存消息
- a. 存儲方式
- 物理上把topic分成一個或多個patition(對應 server.properties 中的num.partitions=3配置),每個patition物理上對應一個文件夾(該文件夾存儲該patition的所有消息和索引文件)。
- b. 存儲策略
- 無論消息是否被消費,kafka都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
- 基于時間:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
- 需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高 Kafka 性能無關。
- 無論消息是否被消費,kafka都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
(3) Zookeeper存儲結構
- 注意:producer不在zk中注冊,消費者在zk中注冊。
6. Kafka消費過程分析
- kafka提供了兩套consumer API:高級Consumer API和低級API。
(1) 消費模型
-
消息由生產者發(fā)布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
-
基于推送模型(push)的消息系統(tǒng),由消息代理記錄消費者的消費狀態(tài)。消息代理在將消息推送到消費者后,標記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發(fā)送出去后,當消費進程掛掉或者由于網絡原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經把這條消息標記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發(fā)送完消息后,要設置狀態(tài)為“已發(fā)送”,只有收到消費者的確認請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態(tài),這種做法顯然是不可取的。
-
Kafka采用拉取模型,**由消費者自己記錄消費狀態(tài),每個消費者互相獨立地順序讀取每個分區(qū)的消息。**如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制,生產者最新寫入的消息如果還沒有達到備份數(shù)量,對消費者是不可見的。這種由消費者控制偏移量的優(yōu)點是:**消費者可以按照任意的順序消費消息。**比如,消費者可以重置到舊的偏移量,重新處理之前已經消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。
-
在一些消息系統(tǒng)中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態(tài),這種設計很大程度上限制了消息系統(tǒng)的整體吞吐量和處理延遲。Kafka的做法是生產者發(fā)布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設置保留時間來清理過期的數(shù)據(jù),比如,設置保留策略為兩天。那么,在消息發(fā)布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。
(2) 高級API
- a. 高級API優(yōu)點
- 高級API寫起來簡單;
- 不需要自行去管理offset,系統(tǒng)通過zookeeper自行管理;
- 不需要管理分區(qū),副本等情況,系統(tǒng)自動管理;
- 消費者斷線會自動根據(jù)上一次記錄在zookeeper中的offset去接著獲取數(shù)據(jù)(默認設置1分鐘更新一下zookeeper中存的offset)
- 可以使用group來區(qū)分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)
- b. 高級API缺點
- 不能自行控制offset(對于某些特殊需求來說)
- 不能細化控制如分區(qū)、副本、zk等
(3) 低級API
- a. 低級 API 優(yōu)點
- 能夠讓開發(fā)者自己控制offset,想從哪里讀取就從哪里讀取。
- 自行控制連接分區(qū),對分區(qū)自定義進行負載均衡
- 對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內存中)
- b. 低級API缺點
- 太過復雜,需要自行控制offset,連接哪個分區(qū),找到分區(qū)leader 等。
(4) 消費者組
- 消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區(qū)在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區(qū),另外兩個分別讀取一個分區(qū)。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。
- 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區(qū)。
(5) 消費方式
- consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
- push(推)模式很難適應消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務以及網絡擁塞。而pull模式則可以根據(jù)consumer的消費能力以適當?shù)乃俾氏M消息。
- 對于Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
- pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直等待數(shù)據(jù)到達。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大小)。
轉載于:https://juejin.im/post/5ce377c9f265da1bb679ec3c
總結
以上是生活随笔為你收集整理的Linux环境Kafka安装配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Eclipse:Cannot compl
- 下一篇: 数据处理算法链接 DATA MINING