Kafka入门教程(转载)
原文地址:https://blog.csdn.net/dapeng1995/article/details/81536862
1.1?消息隊(duì)列(Message Queue)
Message Queue消息傳送系統(tǒng)提供傳送服務(wù)。消息傳送依賴于大量支持組件,這些組件負(fù)責(zé)處理連接服務(wù)、消息的路由和傳送、持久性、安全性以及日志記錄。消息服務(wù)器可以使用一個(gè)或多個(gè)代理實(shí)例。
JMS(Java Messaging Service)是Java平臺(tái)上有關(guān)面向消息中間件(MOM)的技術(shù)規(guī)范,它便于消息系統(tǒng)中的Java應(yīng)用程序進(jìn)行消息交換,并且通過(guò)提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送、接收消息的接口簡(jiǎn)化企業(yè)應(yīng)用的開(kāi)發(fā),翻譯為Java消息服務(wù)。
?
1.2?MQ消息模型
KafkaMQ消息模型圖1-1
?
1.3?MQ消息隊(duì)列分類
消息隊(duì)列分類:點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱兩種:
1、點(diǎn)對(duì)點(diǎn):
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。
消息被消費(fèi)以后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。
2、發(fā)布/訂閱:
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)。
?
1.4?MQ消息隊(duì)列對(duì)比
1、RabbitMQ:支持的協(xié)議多,非常重量級(jí)消息隊(duì)列,對(duì)路由(Routing),負(fù)載均衡(Loadbalance)或者數(shù)據(jù)持久化都有很好的支持。
?
2、ZeroMQ:號(hào)稱最快的消息隊(duì)列系統(tǒng),尤其針對(duì)大吞吐量的需求場(chǎng)景,擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列,但是技術(shù)也復(fù)雜,并且只提供非持久性的隊(duì)列。
?
3、ActiveMQ:Apache下的一個(gè)子項(xiàng),類似ZeroMQ,能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。
?
4、Redis:是一個(gè)key-Value的NOSql數(shù)據(jù)庫(kù),但也支持MQ功能,數(shù)據(jù)量較小,性能優(yōu)于RabbitMQ,數(shù)據(jù)超過(guò)10K就慢的無(wú)法忍受。
?
1.5?Kafka簡(jiǎn)介
Kafka是分布式發(fā)布-訂閱消息系統(tǒng),它最初由?LinkedIn?公司開(kāi)發(fā),使用?Scala語(yǔ)言編寫(xiě),之后成為?Apache?項(xiàng)目的一部分。在Kafka集群中,沒(méi)有“中心主節(jié)點(diǎn)”的概念,集群中所有的服務(wù)器都是對(duì)等的,因此,可以在不做任何配置的更改的情況下實(shí)現(xiàn)服務(wù)器的的添加與刪除,同樣的消息的生產(chǎn)者和消費(fèi)者也能夠做到隨意重啟和機(jī)器的上下線。
?
Kafka消息系統(tǒng)生產(chǎn)者和消費(fèi)者部署關(guān)系圖1-2
Kafka消息系統(tǒng)架構(gòu)圖1-3
?
1.6?Kafka術(shù)語(yǔ)介紹
1、消息生產(chǎn)者:即:Producer,是消息的產(chǎn)生的源頭,負(fù)責(zé)生成消息并發(fā)送到Kafka
服務(wù)器上。
?
2、消息消費(fèi)者:即:Consumer,是消息的使用方,負(fù)責(zé)消費(fèi)Kafka服務(wù)器上的消息。
?
3、主題:即:Topic,由用戶定義并配置在Kafka服務(wù)器,用于建立生產(chǎn)者和消息者之間的訂閱關(guān)系:生產(chǎn)者發(fā)送消息到指定的Topic下,消息者從這個(gè)Topic下消費(fèi)消息。
?
4、消息分區(qū):即:Partition,一個(gè)Topic下面會(huì)分為很多分區(qū),例如:“kafka-test”這個(gè)Topic下可以分為6個(gè)分區(qū),分別由兩臺(tái)服務(wù)器提供,那么通常可以配置為讓每臺(tái)服務(wù)器提供3個(gè)分區(qū),假如服務(wù)器ID分別為0、1,則所有的分區(qū)為0-0、0-1、0-2和1-0、1-1、1-2。Topic物理上的分組,一個(gè)?topic可以分為多個(gè)?partition,每個(gè)?partition?是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的?id(offset)。
?
5、Broker:即Kafka的服務(wù)器,用戶存儲(chǔ)消息,Kafa集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱為?broker。
?
6、消費(fèi)者分組:Group,用于歸組同類消費(fèi)者,在Kafka中,多個(gè)消費(fèi)者可以共同消息一個(gè)Topic下的消息,每個(gè)消費(fèi)者消費(fèi)其中的部分消息,這些消費(fèi)者就組成了一個(gè)分組,擁有同一個(gè)分組名稱,通常也被稱為消費(fèi)者集群。
?
7、Offset:消息存儲(chǔ)在Kafka的Broker上,消費(fèi)者拉取消息數(shù)據(jù)的過(guò)程中需要知道消息在文件中的偏移量,這個(gè)偏移量就是所謂的Offset。
?
1.7?Kafka中Broker
1、Broker:即Kafka的服務(wù)器,用戶存儲(chǔ)消息,Kafa集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱為?broker。
?
2、Message在Broker中通Log追加的方式進(jìn)行持久化存儲(chǔ)。并進(jìn)行分區(qū)(patitions)。
?
3、為了減少磁盤(pán)寫(xiě)入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤(pán),這樣減少了磁盤(pán)IO調(diào)用的次數(shù)。
?
4、Broker沒(méi)有副本機(jī)制,一旦broker宕機(jī),該broker的消息將都不可用。Message消息是有多份的。
?
5、Broker不保存訂閱者的狀態(tài),由訂閱者自己保存。
?
6、無(wú)狀態(tài)導(dǎo)致消息的刪除成為難題(可能刪除的消息正在被訂閱),kafka采用基于時(shí)間的SLA(服務(wù)水平保證),消息保存一定時(shí)間(通常為7天)后會(huì)被刪除。
?
7、消息訂閱者可以rewind back到任意位置重新進(jìn)行消費(fèi),當(dāng)訂閱者故障時(shí),可以選擇最小的offset(id)進(jìn)行重新讀取消費(fèi)消息。
?
1.8?Kafka的Message組成
1、Message消息:是通信的基本單位,每個(gè)?producer?可以向一個(gè)?topic(主題)發(fā)布一些消息。
?
2、Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨(dú)立的。每個(gè)topic又可以分成幾個(gè)不同的partition(每個(gè)topic有幾個(gè)partition是在創(chuàng)建topic時(shí)指定的),每個(gè)partition存儲(chǔ)一部分Message。
?
3、partition中的每條Message包含了以下三個(gè)屬性:
offset ?????即:消息唯一標(biāo)識(shí):對(duì)應(yīng)類型:long
MessageSize?對(duì)應(yīng)類型:int32
data????? ??是message的具體內(nèi)容。
?
1.9?Kafka的Partitions分區(qū)
1、Kafka基于文件存儲(chǔ).通過(guò)分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來(lái)避免文件尺寸達(dá)到單機(jī)磁盤(pán)的上限,每個(gè)partiton都會(huì)被當(dāng)前server(kafka實(shí)例)保存。
?
2、可以將一個(gè)topic切分多任意多個(gè)partitions,來(lái)消息保存/消費(fèi)的效率。
?
3、越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力。
?
1.10?Kafka的Consumers
1、消息和數(shù)據(jù)消費(fèi)者,訂閱?topics并處理其發(fā)布的消息的過(guò)程叫做?consumers。
?
2、在?kafka中,我們可以認(rèn)為一個(gè)group是一個(gè)“訂閱者”,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)“訂閱者”中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)?consumer可以消費(fèi)多個(gè)partitions中的消息(消費(fèi)者數(shù)據(jù)小于Partions的數(shù)量時(shí))。注意:kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無(wú)法得到消息。
?
3、一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消息。每個(gè)group中consumer消息消費(fèi)互相獨(dú)立。
?
1.11?Kafka的持久化
1、一個(gè)Topic可以認(rèn)為是一類消息,每個(gè)topic將被分成多partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件。任何發(fā)布到此partition的消息都會(huì)被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),partition是以文件的形式存儲(chǔ)在文件系統(tǒng)中。
?
2、Logs文件根據(jù)broker中的配置要求,保留一定時(shí)間后刪除來(lái)釋放磁盤(pán)空間。
????????
Kafka消息分區(qū)Partition圖1-4
?
Partition:
?? Topic物理上的分組,一個(gè)?topic可以分為多個(gè)?partition,每個(gè)?partition?是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的?id(offset)。
?
3、為數(shù)據(jù)文件建索引:稀疏存儲(chǔ),每隔一定字節(jié)的數(shù)據(jù)建立一條索引。下圖為一個(gè)partition的索引示意圖:
???????
Kafka消息分區(qū)Partition索引圖1-5
1.12?Kafka的分布式實(shí)現(xiàn):
?????
Kafka分布式關(guān)系圖1-6
????????
Kafka生產(chǎn)環(huán)境關(guān)系圖1-7
1.13?Kafka的通訊協(xié)議:
1、Kafka的Producer、Broker和Consumer之間采用的是一套自行設(shè)計(jì)基于TCP層的協(xié)議,根據(jù)業(yè)務(wù)需求定制,而非實(shí)現(xiàn)一套類似ProtocolBuffer的通用協(xié)議。
?
2、基本數(shù)據(jù)類型:(Kafka是基于Scala語(yǔ)言實(shí)現(xiàn)的,類型也是Scala中的數(shù)據(jù)類型)
?
定長(zhǎng)數(shù)據(jù)類型:int8,int16,int32和int64,對(duì)應(yīng)到Java中就是byte, short, int和long。
?
變長(zhǎng)數(shù)據(jù)類型:bytes和string。變長(zhǎng)的數(shù)據(jù)類型由兩部分組成,分別是一個(gè)有符號(hào)整數(shù)N(表示內(nèi)容的長(zhǎng)度)和N個(gè)字節(jié)的內(nèi)容。其中,N為-1表示內(nèi)容為null。bytes的長(zhǎng)度由int32表示,string的長(zhǎng)度由int16表示。
?
數(shù)組:數(shù)組由兩部分組成,分別是一個(gè)由int32類型的數(shù)字表示的數(shù)組長(zhǎng)度N和N個(gè)元素。
?
3、Kafka通訊的基本單位是Request/Response。
?
4、基本結(jié)構(gòu):
RequestOrResponse => MessageSize(RequestMessage | ResponseMessage)
| 名稱 | 類型 | 描術(shù) |
| MessageSize | int32 | 表示RequestMessage或者ResponseMessage的長(zhǎng)度 ? |
| RequestMessage ResponseMessage | — | ? |
?
?
5、通訊過(guò)程:
客戶端打開(kāi)與服務(wù)器端的Socket
往Socket寫(xiě)入一個(gè)int32的數(shù)字(數(shù)字表示這次發(fā)送的Request有多少字節(jié))
服務(wù)器端先讀出一個(gè)int32的整數(shù)從而獲取這次Request的大小
然后讀取對(duì)應(yīng)字節(jié)數(shù)的數(shù)據(jù)從而得到Request的具體內(nèi)容
服務(wù)器端處理了請(qǐng)求后,也用同樣的方式來(lái)發(fā)送響應(yīng)。
?
6、RequestMessage結(jié)構(gòu):
RequestMessage => ApiKey ApiVersionCorrelationId ClientId Request
| 名稱 | 類型 | 描術(shù) |
| ApiKey | int16 | 表示這次請(qǐng)求的API編號(hào) |
| ApiVersion | int16 | 表示請(qǐng)求的API的版本,有了版本后就可以做到后向兼容 |
| CorrelationId | int32 | 由客戶端指定的一個(gè)數(shù)字唯一標(biāo)示這次請(qǐng)求的id,服務(wù)器端在處理完請(qǐng)求后也會(huì)把同樣的CorrelationId寫(xiě)到Response中,這樣客戶端就能把某個(gè)請(qǐng)求和響應(yīng)對(duì)應(yīng)起來(lái)了。 |
| ClientId | string | 客戶端指定的用來(lái)描述客戶端的字符串,會(huì)被用來(lái)記錄日志和監(jiān)控,它唯一標(biāo)示一個(gè)客戶端。 |
| Request | — | Request的具體內(nèi)容。 |
?
7、ResponseMessage結(jié)構(gòu):
ResponseMessage => CorrelationId Response
| 名稱 | 類型 | 描術(shù) |
| CorrelationId | int32 | 對(duì)應(yīng)Request的CorrelationId。 |
| Response | — | 對(duì)應(yīng)Request的Response,不同的Request的Response的字段是不一樣的。 |
??????
Kafka采用是經(jīng)典的Reactor(同步IO)模式,也就是1個(gè)Acceptor響應(yīng)客戶端的連接請(qǐng)求,N個(gè)Processor來(lái)讀取數(shù)據(jù),這種模式可以構(gòu)建出高性能的服務(wù)器。
?
8、Message結(jié)構(gòu):
Message:Producer生產(chǎn)的消息,鍵-值對(duì)
Message => Crc MagicByte Attributes KeyValue
| 名稱 | 類型 | 描術(shù) |
| CRC | int32 | 表示這條消息(不包括CRC字段本身)的校驗(yàn)碼。 |
| MagicByte | int8 | 表示消息格式的版本,用來(lái)做后向兼容,目前值為0。 |
| Attributes | int8 | 表示這條消息的元數(shù)據(jù),目前最低兩位用來(lái)表示壓縮格式。 |
| Key | bytes | 表示這條消息的Key,可以為null。 |
| Value | bytes | 表示這條消息的Value。Kafka支持消息嵌套,也就是把一條消息作為Value放到另外一條消息里面。 |
?
9、MessageSet結(jié)構(gòu):
MessageSet:用來(lái)組合多條Message,它在每條Message的基礎(chǔ)上加上了Offset和MessageSize
MessageSet => [Offset MessageSize Message]
| 名稱 | 類型 | 描術(shù) |
| Offset | int64 | 它用來(lái)作為log中的序列號(hào),Producer在生產(chǎn)消息的時(shí)候還不知道具體的值是什么,可以隨便填個(gè)數(shù)字進(jìn)去。 |
| MessageSize | int32 | 表示這條Message的大小。 |
| Message | - | 表示這條Message的具體內(nèi)容,其格式見(jiàn)上一小節(jié)。 |
?
?
10、?????Request/Respone和Message/MessageSet的關(guān)系:
?
Request/Response是通訊層的結(jié)構(gòu),和網(wǎng)絡(luò)的7層模型對(duì)比的話,它類似于TCP層。
?
Message/MessageSet定義的是業(yè)務(wù)層的結(jié)構(gòu),類似于網(wǎng)絡(luò)7層模型中的HTTP層。Message/MessageSet只是Request/Response的payload中的一種數(shù)據(jù)結(jié)構(gòu)。
?
備注:Kafka的通訊協(xié)議中不含Schema,格式也比較簡(jiǎn)單,這樣設(shè)計(jì)的好處是協(xié)議自身的Overhead小,再加上把多條Message放在一起做壓縮,提高壓縮比率,從而在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量會(huì)少一些。
?
1.14?數(shù)據(jù)傳輸?shù)氖聞?wù)定義:
1、at most once:最多一次,這個(gè)和JMS中”非持久化”消息類似.發(fā)送一次,無(wú)論成敗,將不會(huì)重發(fā)。
at most once:消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過(guò)程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理.那么此后”未處理”的消息將不能被fetch到,這就是“atmost once”。
?
2、at least once:消息至少發(fā)送一次,如果消息未能接受成功,可能會(huì)重發(fā),直到接收成功。
at least once:消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是“atleast once”,原因offset沒(méi)有及時(shí)的提交給zookeeper,zookeeper恢復(fù)正常還是之前offset狀態(tài)。
?
3、exactly once:消息只會(huì)發(fā)送一次。
exactly once: kafka中并沒(méi)有嚴(yán)格的去實(shí)現(xiàn)(基于2階段提交,事務(wù)),我們認(rèn)為這種策略在kafka中是沒(méi)有必要的。
?
注:通常情況下“at-least-once”是我們首選。(相比at most once而言,重復(fù)接收數(shù)據(jù)總比丟失數(shù)據(jù)要好)。
?
1.15?學(xué)習(xí)Kafka推薦書(shū)籍:
1.?《Apache Kafka》
2.?《從Paxos到Zookeeper分布式一致性原理與實(shí)踐》
總結(jié)
以上是生活随笔為你收集整理的Kafka入门教程(转载)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python实现WebsocketSer
- 下一篇: css布局,让侧边栏高度撑满,并且不会随