kafka控制台模拟消费_Kafka 详解
kafka簡介
Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基于zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基于hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務等等,用scala語言編寫,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源 項目。
關鍵詞
- 分布式流處理平臺。
- 在系統之間構建實時數據流管道。
- 以topic分類對記錄進行存儲
- 每個記錄包含key-value+timestamp
- 每秒鐘百萬消息吞吐量。
安裝kafka
0.選擇三臺主機安裝kafka 1.準備zk 略 2.jdk 略 3.tar文件 4.環境變量 略 5.配置kafka [kafka/config/server.properties] ... broker.id=201 ... listeners=PLAINTEXT://:9092 ... log.dirs=/home/centos/kafka/logs ... zookeeper.connect=s201:2181,s202:2181,s203:21816.分發server.properties,同時修改每個文件的broker.id7.啟動kafka服務器 a)先啟動zk b)啟動kafka [s202 ~ s204] $>bin/kafka-server-start.sh -daemon config/server.propertiesc)驗證kafka服務器是否啟動 $>netstat -anop | grep 90928.創建主題 $>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test9.查看主題列表 $>bin/kafka-topics.sh --list --zookeeper s201:218110.啟動控制臺生產者 $>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test11.啟動控制臺消費者 $>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:218112.在生產者控制臺輸入hello worldkafka 的使用場景
- 埋點日志的收集一個公司可以用Kafka可以收集各種服務的log。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和flink
- 事件源
kafka如何保證的消息數據不丟失
當討論這個問題的時候,首先需要考量kafka的運行機制。kafka主要分為三個組件,producer、consumer、broker。所以也必須從三個方面去考量,producer、consumer、broker端數據不丟失。
一、producer端如何保證數據不丟失
1.ack的配配置策略
acks = 0 生產者發送消息之后 不需要等待服務端的任何響應,它不管消息有沒有發送成功,如果發送過程中遇到了異常, 導致broker端沒有收到消息,消息也就丟失了。實際上它只是把消息發送到了socketBuffer(緩存)中, 而socketBuffer什么時候被提交到broker端并不關心,它不擔保broker端是否收到了消息, 但是這樣的配置對retry是不起作用的,因為producer端都不知道是否發生了錯誤, 而且對于offset的獲取永遠都是-1,因為broker端可能還沒有開始寫數據。 這樣不保險的操作為什么還有這樣的配置?kafka對于收集海量數據, 如果在收集某一項日志時是允許數據量有一定丟失的話,是可以用這種配置來收集日志。 acks = 1(默認值) 生產者發送消息之后,只要分區的leader副本成功寫入消息,那么它就會收到來自服務端的成功響應。 其實就是消息只發給了leader leader收到消息后會返回ack到producer端。 如果消息無法寫入leader時(選舉、宕機等情況時),生產都會收到一個錯誤的響應,為了避免消息丟失, 生產者可以選擇重發消息,如果消息成功寫入,在被其它副本同步數據時leader 崩潰,那么此條數據 還是會丟失,因為新選舉的leader是沒有收到這條消息,ack設置為1是消息可靠性和吞吐量折中的方案。 acks = all (或-1) 生產者在發送消息之后,需要等待ISR中所有的副本都成功寫入消息之后才能夠收到來自服務端的成功響應, 在配置環境相同的情況下此種配置可以達到最強的可靠性。即:在發送消息時,需要leader 向fllow 同步完數據之后,也就是ISR隊列中所有的broker全部保存完這條消息后,才會向ack發送消息,表示發送成功。2.retries的配置策略
在kafka中錯誤分為2種,一種是可恢復的,另一種是不可恢復的。
- 可恢復性的錯誤:
如遇到在leader的選舉、網絡的抖動等這些異常時,如果我們在這個時候配置的retries大于0的, 也就是可以進行重試操作,那么等到leader選舉完成后、網絡穩定后,這些異常就會消息,錯誤也就可以恢復, 數據再次重發時就會正常發送到broker端。需要注意retries(重試)之間的時間間隔, 以確保在重試時可恢復性錯誤都已恢復。
- 不可恢復性的錯誤:
如:超過了發送消息的最大值(max.request.size)時,這種錯誤是不可恢復的,如果不做處理, 那么數據就會丟失,因此我們需要注意在發生異常時把這些消息寫入到DB、緩存本地文件中等等, 把這些不成功的數據記錄下來,等錯誤修復后,再把這些數據發送到broker端。
如何選擇
高可用型配置:
acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根據實際情況設置retry可能恢復的間隔時間)
- 優點:這樣保證了producer端每發送一條消息都要成功,如果不成功并將消息緩存起來,等異常恢復后再次發送。
- 缺點:這樣保證了高可用,但是這會導致集群的吞吐量不是很高,因為數據發送到broker之后,leader要將數據同步到fllower上,如果網絡帶寬、不穩定等情況時,ack響應時間會更長
折中型配置:
acks = 1 retries > 0 retries 時間間隔設置 (并根據實際情況設置retries可能恢復的間隔時間)
- 優點:保證了消息的可靠性和吞吐量,是個折中的方案
- 缺點:性能處于2者中間3.高吞吐型
高效率配置:
acks = 0
- 優點:可以相對容忍一些數據的丟失,吞吐量大,可以接收大量請求
- 缺點:不知道發送的消息是 否成功
每種配置都有對應的生產用途,視情況而定。。
二、consumer端如何保證數據不丟失
consumer端配置
1、group.id: consumer group 分組的一個id
消費者隸屬消費組的名稱,kafka的每個partition值允許同一個group的一個consumer消費。這樣做的目的是為了保證kafka的高吞吐量
2、auto.offset.reset = earliest(最早) /latest(最晚)
設置從哪個位置開始消費
3、enable.auto.commit = true/false(默認true)
當設置為true時,意味著由kafka的consumer端自己間隔一定的時間會自動提交
offset,如果設置成了fasle,也就是由客戶端(自己寫代碼)來提交,那就還得控制提交的時間間隔
auto.commit.interval.ms
當enabe.auto.commit設置為true時才生效,表示開啟自動提交消費位移功能時自動提交消費位移的時間間隔。
在consumer階段,如果設置為true,意味著會自動提交offset,比如說當你pull了30條數據,但是當處理20條數據的時候自動提交了commit,當處理21條數據的時候,系統崩了,那當你再去拉取數據的時候,就會從30開始啦,那就會丟失21-30的數據
如果設置為false,可以手動提交,你可以處理一條提交一次,也可以處理一批提交一批,但是consumer在消費數據的時候,是以batch的模式去pull數據的,假設pull了30條數據,你在處理30條數據的時候,沒處理一條,就提交一次的話,會非常影響消費能力,你可以還是按照一批來處理,設置一個累加器,處理一條加1,如果在處理數據時發生了異常,那就把當前處理失敗的offset進行提交(放在finally代碼塊中)注意一定要確保offset的正確性,當下次再次消費的時候就可以從提交的offset處進行再次消費。
consumer 保證確保消息只被處理一次處理,同時確保冪等性
需要結合具體的業務來看 :
- 比如你拿個數據要寫庫,先根據主鍵查一下,如果這數據都有了,你就別插入了,update一下好吧
- 比如你是寫redis,那沒問題了,反正每次都是set,天然冪等性
- 比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的id,類似訂單id之類的東西,然后你這里消費到了之后,先根據這個id去比如redis里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個id寫redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。
- 還有比如基于數據庫的唯一鍵來保證重復數據不會重復插入多條,拿到數據的時候,每次重啟可能會有重復,因為kafka消費者還沒來得及提交offset,重復數據拿到了以后我們插入的時候,因為有唯一鍵約束了,所以重復數據只會插入報錯,不會導致數據庫中出現臟數據
三、broker端是如何保證數據不丟失的
1.replication-factor 3
在創建topic時會通過replication-factor來創建副本的個數,它提高了kafka的高可用性,同時,它允許n-1臺broker掛掉,設置好合理的副本因子對kafka整體性能是非常有幫助的,通常是3個,極限是5個,如果多了也會影響開銷。
2.min.insync.replicas = 2
分區ISR隊列集合中最少有多少個副本,默認值是1
3.unclean.leader.election.enable = false
是否允許從ISR隊列中選舉leader副本,默認值是false,如果設置成true,則可能會造成數據丟失。
leader選舉造成的數據丟失
3個replica分別為0 1 2,0為leader,數據都能完全同步到100,在某一時刻,分別有2個fllow掛掉了,此時有producer往0 的replica上發送50條數據完后,此時的leader掛掉了,而此時剛好的1個fllow起來了,它沒有向leader上feach數據,因為leader已經不存在了,此時有2種處理方法:重新起來的fllow可以成為1個leader,需要通過 unclean.leader.election.enable=true,這樣做保證了高可用,但是這樣做的弊端是:新起來的fllow成為了leader,但是它會丟失部分數據,雖然這樣保證了高可用。另一種情況是設置為false,不讓fllow競選leader,但是這樣也會造成數據的丟失。假如在ISR的隊列里面,只有0 1,但此時replica 1 沒有來得及向leader feach數據leader掛掉了,這樣也會造成數據的丟失。
broker配置策略
- min.insync.replica
在一個topic中,1個分區 有3個副本,在創建時設置了min.insync.replica=2,假如此時在ISR中只有leader副本(1個)存在,在producer端生產數據時,此時的acks=all,這也就意味著在producer向broker端寫數據時,必須保證ISR中指定數量的副本(包含leader、fllow副本)全部同步完成才算寫成功,這個數量就是由min.insync.replica來控制的,這樣producer端向broker端寫數據是不成功,因為ISR中只有leader副本,min.insync.replica要求2個副本,此時的producer生產數據失敗(異常),當然consumer端是可以消費數據的,只不過是沒有新數據產生而已.這樣保證了數據的一致性,但這樣會導致高可用性降低了。一般的配置是按: n/2 +1 來配置min.insync.replicas 的數量的,
同時也要將unclean.leader.election.enable=false
- unclean.leader.election.enable
假如現在有leader 0 fllow 1 fllow 2 三個副本,存儲的數據量分別是10 9 8,此時的broker的配置是:min.insync.replica=2 acks=all,leader的數據更新到了15,在沒有同步到fllow 1 fllow 2時掛掉了,此時的ISR隊列中是有fllow 1 和fllow 2的,如果unclean.leader.election.enable設置的是true,表示在ISR中的副本是可以競選leader這樣就會造成9-15或8-15之間的數據丟失,所以unclean.leader.election.enable必須設置成成false,這樣整個kafka cluster都不讀寫了,這樣就保證了數據的高度一致性.
kafka中topic設計原理
因為consumer group 中所有的consumer一定會消費topic中的partition,而一個partition只能同時被同一group中的一個consumer消費;
所以最優的設計就是:
- consumer group下的consumer thread的數量等于partition數量,這樣效率是最高的。
- 一個Topic的Partition數量大于等于Broker的數量,可以提高吞吐率
參考文章
https://www.cnblogs.com/MrRightZhao/p/11498952.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的kafka控制台模拟消费_Kafka 详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 服务器安全维护包含,服务器安全维护包含
- 下一篇: 统信uos系统考试题_148款!富士通及