2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
?
目錄
Kafka快速回顧
?消息隊列:
?發(fā)布/訂閱模式:
Kafka 重要概念:
常用命令
整合說明
兩種方式
兩個版本API
在實際項目中,無論使用Storm還是SparkStreaming與Flink,主要從Kafka實時消費(fèi)數(shù)據(jù)進(jìn)行處理分析,流式數(shù)據(jù)實時處理技術(shù)架構(gòu)大致如下:
?
技術(shù)棧: Flume/SDK/Kafka Producer API ?-> KafKa ?—> ?SparkStreaming/Flink/Storm(Hadoop YARN) -> Redis ?-> UI
1)、阿里工具Canal:監(jiān)控MySQL數(shù)據(jù)庫binlog文件,將數(shù)據(jù)同步發(fā)送到Kafka Topic中https://github.com/alibaba/canalhttps://github.com/alibaba/canal/wiki/QuickStart2)、Maxwell:實時讀取MySQL二進(jìn)制日志binlog,并生成 JSON 格式的消息,作為生產(chǎn)者發(fā)送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平臺的應(yīng)用程序。http://maxwells-daemon.io/https://github.com/zendesk/maxwell
擴(kuò)展:Kafka 相關(guān)常見面試題:
1)、Kafka 集群大小(規(guī)模),Topic分區(qū)函數(shù)名及集群配置?2)、Topic中數(shù)據(jù)如何管理?數(shù)據(jù)刪除策略是什么?3)、如何消費(fèi)Kafka數(shù)據(jù)?4)、發(fā)送數(shù)據(jù)Kafka Topic中時,如何保證數(shù)據(jù)發(fā)送成功?
?
Apache Kafka: 最原始功能【消息隊列】,緩沖數(shù)據(jù),具有發(fā)布訂閱功能(類似微信公眾號)。
Kafka快速回顧
Kafka 是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用與大數(shù)據(jù)實時處理領(lǐng)域。
?消息隊列:
Kafka 本質(zhì)上是一個 MQ(Message Queue),使用消息隊列的好處?(面試會問)
- 解耦:允許我們獨(dú)立的擴(kuò)展或修改隊列兩邊的處理過程;
- 可恢復(fù)性:即使一個處理消息的進(jìn)程掛掉,加入隊列中的消息仍可以在系統(tǒng)恢復(fù)后被處理;
- 緩沖:有助于解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況;
- 靈活性&峰值處理能力:不會因為突發(fā)的超負(fù)荷的請求而完全崩潰,消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力;
- 異步通信:消息隊列允許用戶把消息放入隊列但不立即處理它;
?發(fā)布/訂閱模式:
?
?
一對多,生產(chǎn)者將消息發(fā)布到 Topic 中,有多個消費(fèi)者訂閱該主題,發(fā)布到 Topic 的消息會被所有訂閱者消費(fèi),被消費(fèi)的數(shù)據(jù)不會立即從 Topic 清除。
Kafka 框架架構(gòu)圖如下所示:
?
Kafka 存儲的消息來自任意多被稱為 Producer 生產(chǎn)者的進(jìn)程,數(shù)據(jù)從而可以被發(fā)布到不同的 Topic 主題下的不同 Partition 分區(qū)。在一個分區(qū)內(nèi),這些消息被索引并連同時間戳存儲在一起。其它被稱為 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱消息。Kafka 運(yùn)行在一個由一臺或多臺服務(wù)器組成的集群上,并且分區(qū)可以跨集群結(jié)點(diǎn)分布。
Kafka 重要概念:
?1)、Producer: 消息生產(chǎn)者,向 Kafka Broker 發(fā)消息的客戶端;
?2)、Consumer:消息消費(fèi)者,從 Kafka Broker 取消息的客戶端;
?3)、Consumer Group:消費(fèi)者組(CG),消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),提高消費(fèi)能力。一個分區(qū)只能由組內(nèi)一個消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個消費(fèi)者組,即消費(fèi)者組是邏輯上的一個訂閱者;
?4)、Broker:一臺 Kafka 機(jī)器就是一個 Broker。一個集群由多個 Broker 組成。一個 Broker 可以容納多個 Topic;
?5)、Topic:可以理解為一個隊列,Topic 將消息分類,生產(chǎn)者和消費(fèi)者面向的是同一個 Topic;
?6)、Partition:為了實現(xiàn)擴(kuò)展性,提高并發(fā)能力,一個非常大的 Topic 可以分布到多個 Broker (即服務(wù)器)上,一個 Topic 可以分為多個 Partition,每個 Partition 是一個 有序的隊列;
?7)、Replica:副本,為實現(xiàn)備份的功能,保證集群中的某個節(jié)點(diǎn)發(fā)生故障時,該節(jié)點(diǎn)上的 Partition 數(shù)據(jù)不丟失,且 Kafka 仍然能夠繼續(xù)工作,Kafka 提供了副本機(jī)制,一個 Topic 的每個分區(qū)都有若干個副本,一個 Leader 和若干個 Follower;
?8)、Leader:每個分區(qū)多個副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象,都是 Leader;
?9)、Follower:每個分區(qū)多個副本的“從”副本,實時從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時,某個 Follower 還會成為新的 Leader;
?10)、Offset:消費(fèi)者消費(fèi)的位置信息,監(jiān)控數(shù)據(jù)消費(fèi)到什么位置,當(dāng)消費(fèi)者掛掉再重新恢復(fù)的時候,可以從消費(fèi)位置繼續(xù)消費(fèi);
?11)、Zookeeper:Kafka 集群能夠正常工作,需要依賴于 Zookeeper,Zookeeper 幫助 Kafka 存儲和管理集群信息;
?
常用命令
#啟動kafka/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties #停止kafka/export/server/kafka/bin/kafka-server-stop.sh #查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#創(chuàng)建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1?--partitions 3 --topic test#查看某個topic信息/export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test#刪除topic/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test#啟動生產(chǎn)者--控制臺的生產(chǎn)者--一般用于測試/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka# 啟動消費(fèi)者--控制臺的消費(fèi)者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark_kafka --from-beginning
?
整合說明
兩種方式
Receiver-based Approach:
1.KafkaUtils.createDstream基于接收器方式,消費(fèi)Kafka數(shù)據(jù),已淘汰,企業(yè)中不再使用;
2.Receiver作為常駐的Task運(yùn)行在Executor等待數(shù)據(jù),但是一個Receiver效率低,需要開啟多個,再手動合并數(shù)據(jù)(union),再進(jìn)行處理,很麻煩;
3.Receiver那臺機(jī)器掛了,可能會丟失數(shù)據(jù),所以需要開啟WAL(預(yù)寫日志)保證數(shù)據(jù)安全,那么效率又會降低;
4.Receiver方式是通過zookeeper來連接kafka隊列,調(diào)用Kafka高階API,offset存儲在zookeeper,由Receiver維護(hù);
5.Spark在消費(fèi)的時候為了保證數(shù)據(jù)不丟也會在Checkpoint中存一份offset,可能會出現(xiàn)數(shù)據(jù)不一致;
?
Direct Approach (No Receivers):
1.KafkaUtils.createDirectStream直連方式,Streaming中每批次的每個job直接調(diào)用Simple Consumer API獲取對應(yīng)Topic數(shù)據(jù),此種方式使用最多,面試時被問的最多;
2.Direct方式是直接連接kafka分區(qū)來獲取數(shù)據(jù),從每個分區(qū)直接讀取數(shù)據(jù)大大提高并行能力
3.Direct方式調(diào)用Kafka低階API(底層API),offset自己存儲和維護(hù),默認(rèn)由Spark維護(hù)在checkpoint中,消除了與zk不一致的情況 ;
4.當(dāng)然也可以自己手動維護(hù),把offset存在MySQL/Redis中;
?
?
?
兩個版本API
Spark Streaming與Kafka集成,有兩套API,原因在于Kafka Consumer API有兩套,文檔:
http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html
?
1. Kafka 0.8.x版本 -早已淘汰
- 底層使用老的KafkaAPI:Old Kafka Consumer?API
- 支持Receiver(已淘汰)和Direct模式:
2.Kafka 0.10.x版本-開發(fā)中使用
- 底層使用新的KafkaAPI: New Kafka Consumer API
- 只支持Direct模式
?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十一):S
- 下一篇: 2021年大数据Spark(四十三):S