kafka 消息队列
kafka 消息隊列
kafka 架構原理
大數據時代來臨,如果你還不知道Kafka那就真的out了!據統計,有三分之一的世界財富500強企業正在使用Kafka,包括所有TOP10旅游公司,7家TOP10銀行,8家TOP10保險公司,9家TOP10電信公司等等。LinkedIn、Microsoft和Netflix每天都用Kafka處理萬億級的信息。本文就讓我們一起來大白話kafka的架構原理。
kafka官網:http://kafka.apache.org/
01 kafka簡介
Kafka最初由Linkedin公司開發,是一個分布式的、分區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統(也可以當做MQ系統),常用于web/nginx日志、訪問日志、消息服務等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
02 kafka的特性
-
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;
-
可擴展性:kafka集群支持熱擴展;
-
持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止丟失;
-
容錯性:允許集群中的節點失敗(若分區副本數量為n,則允許n-1個節點失敗);
-
高并發:單機可支持數千個客戶端同時讀寫;
03 kafka的應用場景
-
日志收集:一個公司可以用Kafka收集各種服務的log,通過kafka以統一接口開放給各種消費端,例如hadoop、Hbase、Solr等。
-
消息系統:解耦生產者和消費者、緩存消息等。
-
用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索記錄、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
-
運營指標:Kafka也經常用來記錄運營監控數據。
-
流式處理
04 kafka架構(重頭戲!)
下面是一個kafka的架構圖,
整體來看,kafka架構中包含四大組件:生產者、消費者、kafka集群、zookeeper集群。對照上面的結構圖,我們先來搞清楚幾個很重要的術語,(看圖!對照圖理解~)
1、broker
kafka 集群包含一個或多個服務器,每個服務器節點稱為一個broker。
2、topic
每條發布到kafka集群的消息都有一個類別,這個類別稱為topic,其實就是將消息按照topic來分類,topic就是邏輯上的分類,同一個topic的數據既可以在同一個broker上也可以在不同的broker結點上。
3、partition
分區,每個topic被物理劃分為一個或多個分區,每個分區在物理上對應一個文件夾,該文件夾里面存儲了這個分區的所有消息和索引文件。在創建topic時可指定parition數量,生產者將消息發送到topic時,消息會根據 分區策略 追加到分區文件的末尾,屬于順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
上面提到了分區策略,所謂分區策略就是決定生產者將消息發送到哪個分區的算法。Kafka 為我們提供了默認的分區策略,同時它也支持自定義分區策略。kafka允許為每條消息設置一個key,一旦消息被定義了 Key,那么就可以保證同一個 Key 的所有消息都進入到相同的分區,這種策略屬于自定義策略的一種,被稱作"按消息key保存策略",或Key-ordering 策略。
同一主題的多個分區可以部署在多個機器上,以此來實現 kafka 的伸縮性。同一partition中的數據是有序的,但topic下的多個partition之間在消費數據時不能保證有序性,在需要嚴格保證消息順序消費的場景下,可以將partition數設為1,但這種做法的缺點是降低了吞吐,一般來說,只需要保證每個分區的有序性,再對消息設置key來保證相同key的消息落入同一分區,就可以滿足絕大多數的應用。
4、offset
partition中的每條消息都被標記了一個序號,這個序號表示消息在partition中的偏移量,稱為offset,每一條消息在partition都有唯一的offset,消息者通過指定offset來指定要消費的消息。
正常情況下,消費者在消費完一條消息后會遞增offset,準備去消費下一條消息,但也可以將offset設成一個較小的值,重新消費一些消費過的消息,可見offset是由consumer控制的,consumer想消費哪一條消息就消費哪一條消息,所以kafka broker是無狀態的,它不需要標記哪些消息被消費過。
5、producer
生產者,生產者發送消息到指定的topic下,消息再根據分配規則append到某個partition的末尾。
6、consumer
消費者,消費者從topic中消費數據。
7、consumer group
消費者組,每個consumer屬于一個特定的consumer group,可為每個consumer指定consumer group,若不指定則屬于默認的group。
同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。這也是kafka用來實現一個topic消息的廣播和單播的手段,如果需要實現廣播,一個consumer group內只放一個消費者即可,要實現單播,將所有的消費者放到同一個consumer group即可。
用consumer group還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。
8、leader
每個partition有多個副本,其中有且僅有一個作為leader,leader會負責所有的客戶端讀寫操作。
9、follower
follower不對外提供服務,只與leader保持數據同步,如果leader失效,則選舉一個follower來充當新的leader。當follower與leader掛掉、卡住或者同步太慢,leader會把這個follower從ISR列表中刪除,重新創建一個follower。
10、rebalance
同一個consumer group下的多個消費者互相協調消費工作,我們這樣想,一個topic分為多個分區,一個consumer group里面的所有消費者合作,一起去消費所訂閱的某個topic下的所有分區(每個消費者消費部分分區),kafka會將該topic下的所有分區均勻的分配給consumer group下的每個消費者,如下圖,
rebalance表示"重平衡",consumer group內某個消費者掛掉后,其他消費者自動重新分配訂閱主題分區的過程,是 Kafka 消費者端實現高可用的重要手段。如下圖Consumer Group A中的C2掛掉,C1會接收P1和P2,以達到重新平衡。同樣的,當有新消費者加入consumer group,也會觸發重平衡操作。
05 對kafka架構的幾點解釋
- 一個典型的kafka集群中包含若干producer,若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干consumer group,以及一個zookeeper集群。kafka通過zookeeper協調管理kafka集群,選舉分區leader,以及在consumer group發生變化時進行rebalance。
- kafka的topic被劃分為一個或多個分區,多個分區可以分布在一個或多個broker節點上,同時為了故障容錯,每個分區都會復制多個副本,分別位于不同的broker節點,這些分區副本中(不管是leader還是follower都稱為分區副本),一個分區副本會作為leader,其余的分區副本作為follower。其中leader負責所有的客戶端讀寫操作,follower不對外提供服務,僅僅從leader上同步數據,當leader出現故障時,其中的一個follower會頂替成為leader,繼續對外提供服務。
- 對于傳統的MQ而言,已經被消費的消息會從隊列中刪除,但在Kafka中被消費的消息也不會立馬刪除,在kafka的server.propertise配置文件中定義了數據的保存時間,當文件到設定的保存時間時才會刪除,
# 數據的保存時間(單位:小時,默認為7天)
log.retention.hours=168
因為Kafka讀取消息的時間復雜度為O(1),與文件大小無關,所以這里刪除過期文件與提高Kafka性能并沒有關系,所以選擇怎樣的刪除策略應該考慮磁盤以及具體的需求。
- 點對點模式 VS 發布訂閱模式
傳統的消息系統中,有兩種主要的消息傳遞模式:點對點模式、發布訂閱模式。
①點對點模式
生產者發送消息到queue中,queue支持存在多個消費者,但是對一個消息而言,只可以被一個消費者消費,并且在點對點模式中,已經消費過的消息會從queue中刪除不再存儲。
②發布訂閱模式
生產者將消息發布到topic中,topic可以被多個消費者訂閱,且發布到topic的消息會被所有訂閱者消費。而kafka就是一種發布訂閱模式。
- 消費端 pull 和 push
① push方式:由消息中間件主動地將消息推送給消費者;
優點:優點是不需要消費者額外開啟線程監控中間件,節省開銷。
缺點:無法適應消費速率不相同的消費者。因為消息的發送速率是broker決定的,而消
費者的處理速度又不盡相同,所以容易造成部分消費者空閑,部分消費者堆積,造成緩
沖區溢出。
② pull方式:由消費者主動向消息中間件拉取消息;
優點:消費端可以按處理能力進行拉取;
缺點:消費端需要另開線程監控中間件,有性能開銷;
對于Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式,既可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
06 開啟zookeeper
07 開啟 kafka
如果服務掛了,就刪掉 kafka-logs 的緩存,重新啟動
08 開生產者producer 發送消息
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-A3thYPDq-1663024484450)(C:\Users\lyz123\AppData\Roaming\Typora\typora-user-images\image-20220912200532032.png)]
09 開生產者producer 發送消息
10 開生產者consumer發送消息
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning11 簡單代碼實現
通過@KafkaKistener 注解來實現的,就是一個kafka的監聽器,topics 是復數,監聽一個或多個主題。一但監聽到有消息,就會調用handleMessage 來處理這個 tipic(主題),會把消息包裝成一個ConsumerRecord 傳進來,通過record對象來處理。
- 引入依賴
- yaml配置
- 代碼實現
總之,生產者發消息,是我們主動去調用的,
@RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() { //*****阻塞方法,就是等一會才發消息給你,而不是立即發私信kafkaProducer.sendMessage("test", "你好");kafkaProducer.sendMessage("test", "在嗎");try {Thread.sleep(1000 * 10); //時間一過,consumer會自動收到這個消息,調用她修飾的方法} catch (InterruptedException e) {e.printStackTrace();}} }@Component class KafkaProducer { //******生產者@Autowiredprivate KafkaTemplate kafkaTemplate; //生產者被spring 整合了public void sendMessage(String topic, String content) {kafkaTemplate.send(topic, content);} }@Component //一旦服務啟動了,spring就會去監聽 topic ,有一個線程會阻塞,一直會嘗試讀取消息,但是 阻塞的狀態,如果沒有消息,沒有主題她就會阻塞在這里。一旦有了topic(主題),她就會交給她所修飾的這個方法去讀取。 class KafkaConsumer { //******消費者@KafkaListener(topics = {"test"}) //監聽的 topicpublic void handleMessage(ConsumerRecord record) { //在調取這個方法的時候會對這個消息進行封裝,通過record對象,就能讀到原始的消息。System.out.println(record.value());} }12 項目實戰
public class Event {private String topic;private int userId;private int entityType;private int entityId;private int entityUserId;private Map<String, Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key, Object value) {this.data.put(key, value);return this;} } @Component public class EventProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;// 處理事件public void fireEvent(Event event) {// 將事件發布到指定的主題kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));} }總結
以上是生活随笔為你收集整理的kafka 消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 神兔侠儿童安全预警平台正式发布,互联网将
- 下一篇: 【Linux】修改Xshell 7默认配