Kafka一些参数配置
Producer消息發送
producer.send(msg); // 用類似這樣的方式去發送消息,就會把消息給你均勻的分布到各個分區上去
producer.send(key, msg); // 訂單id,或者是用戶id,他會根據這個key的hash值去分發到某個分區上去,他可以保證相同的key會路由分發到同一個分區上去。
每次發送消息都必須先把數據封裝成一個ProducerRecord對象,里面包含了要發送的topic,具體在哪個分區,分區key,消息內容,timestamp時間戳,然后這個對象交給序列化器,變成自定義協議格式的數據,接著把數據交給partitioner分區器,對這個數據選擇合適的分區,默認就輪詢所有分區,或者根據key來hash路由到某個分區,這個topic的分區信息,都是在客戶端會有緩存的,當然會提前跟broker去獲取。接著這個數據會被發送到producer內部的一塊緩沖區里,然后producer內部有一個Sender線程,會從緩沖區里提取消息封裝成一個一個的batch,然后每個batch發送給分區的leader副本所在的broker。
常見異常處理
常見的異常如下:
1)LeaderNotAvailableException:某臺機器掛了,此時leader副本不可用,會導致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續寫入,此時可以重試發送即可。
2)NotControllerException:這個也是同理,如果說Controller所在Broker掛了,那么此時會有問題,需要等待Controller重新選舉,此時也是一樣就是重試即可
3)NetworkException:網絡異常,重試即可
參數:retries 默認值是3
參數:retry.backoff.ms 兩次重試之間的時間間隔
提升消息吞吐量
1)buffer.memory:設置發送消息的緩沖區,默認值是33554432,就是32MB
如果發送消息出去的速度小于寫入消息進去的速度,就會導致緩沖區寫滿,此時生產消息就會阻塞住,所以說這里就應該多做一些壓測,盡可能保證說這塊緩沖區不會被寫滿導致生產行為被阻塞住
producer.send(record, new Callback() {
@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {// 消息發送成功System.out.println("消息發送成功"); } else {// 消息發送失敗,需要重新發送}}});Long endTime=System.currentTime();If(endTime - startTime > 100){//說明內存被壓滿了說明有問題}
2)compression.type,默認是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯的,壓縮之后可以減小數據量,提升吞吐量,但是會加大producer端的cpu開銷
3)batch.size,設置meigebatch的大小,如果batch太小,會導致頻繁網絡請求,吞吐量下降;如果batch太大,會導致一條消息需要等待很久才能被發送出去,而且會讓內存緩沖區有很大壓力,過多數據緩沖在內存里
默認值是:16384,就是16kb,也就是一個batch滿了16kb就發送出去,一般在實際生產環境,這個batch的值可以增大一些來提升吞吐量,可以自己壓測一下
4)linger.ms,這個值默認是0,意思就是消息必須立即被發送,但是這是不對的,一般設置一個100毫秒之類的,這樣的話就是說,這個消息被發送出去后進入一個batch,如果100毫秒內,這個batch滿了16kb,自然就會發送出去。但是如果100毫秒內,batch沒滿,那么也必須把消息發送出去了,不能讓消息的發送延遲時間太長,也避免給內存造成過大的一個壓力。
請求超時
1)max.request.size:這個參數用來控制發送出去的消息的大小,默認是1048576字節,也就1mb,這個一般太小了,很多消息可能都會超過1mb的大小,所以需要自己優化調整,把他設置更大一些(企業一般設置成10M)
2)request.timeout.ms:這個就是說發送一個請求出去之后,他有一個超時的時間限制,默認是30秒,如果30秒都收不到響應,那么就會認為異常,會拋出一個TimeoutException來讓我們進行處理
ACK參數
acks參數,其實是控制發送出去的消息的持久化機制的
1)如果acks=0,那么producer根本不管寫入broker的消息到底成功沒有,發送一條消息出去,立馬就可以發送下一條消息,這是吞吐量最高的方式,但是可能消息都丟失了,你也不知道的,但是說實話,你如果真是那種實時數據流分析的業務和場景,就是僅僅分析一些數據報表,丟幾條數據影響不大的。會讓你的發送吞吐量會提升很多,你發送弄一個batch出,不需要等待人家leader寫成功,直接就可以發送下一個batch了,吞吐量很大的,哪怕是偶爾丟一點點數據,實時報表,折線圖,餅圖。
2)acks=all,或者acks=-1:這個leader寫入成功以后,必須等待其他ISR中的副本都寫入成功,才可以返回響應說這條消息寫入成功了,此時你會收到一個回調通知
3)acks=1:只要leader寫入成功,就認為消息成功了,默認給這個其實就比較合適的,還是可能會導致數據丟失的,如果剛寫入leader,leader就掛了,此時數據必然丟了,其他的follower沒收到數據副本,變成leader
如果要想保證數據不丟失,得如下設置:
a)min.insync.replicas = 2,ISR里必須有2個副本,一個leader和一個follower,最最起碼的一個,不能只有一個leader存活,連一個follower都沒有了
b)acks = -1,每次寫成功一定是leader和follower都成功才可以算做成功,leader掛了,follower上是一定有這條數據,不會丟失
c) retries = Integer.MAX_VALUE,無限重試,如果上述兩個條件不滿足,寫入一直失敗,就會無限次重試,保證說數據必須成功的發送給兩個副本,如果做不到,就不停的重試,除非是面向金融級的場景,面向企業大客戶,或者是廣告計費,跟錢的計算相關的場景下,才會通過嚴格配置保證數據絕對不丟失
重試亂序
消息重試是可能導致消息的亂序的,因為可能排在你后面的消息都發送出去了,你現在收到回調失敗了才在重試,此時消息就會亂序,所以可以使用“max.in.flight.requests.per.connection”參數設置為1,這樣可以保證producer同一時間只能發送一條消息
Consumer架構
Offset管理
每個consumer內存里數據結構保存對每個topic的每個分區的消費offset,定期會提交offset,老版本是寫入zk,但是那樣高并發請求zk是不合理的架構設計,zk是做分布式系統的協調的,輕量級的元數據存儲,不能負責高并發讀寫,作為數據存儲。所以后來就是提交offset發送給內部topic:__consumer_offsets,提交過去的時候,key是group.id+topic+分區號,value就是當前offset的值,每隔一段時間,kafka內部會對這個topic進行compact。也就是每個group.id+topic+分區號就保留最新的那條數據即可。而且因為這個 __consumer_offsets可能會接收高并發的請求,所以默認分區50個,這樣如果你的kafka部署了一個大的集群,比如有50臺機器,就可以用50臺機器來抗offset提交的請求壓力,就好很多。
Coordinator
Coordinator的作用
每個consumer group都會選擇一個broker作為自己的coordinator,他是負責監控這個消費組里的各個消費者的心跳,以及判斷是否宕機,然后開啟rebalance,
根據內部的一個選擇機制,會挑選一個對應的Broker,Kafka總會把你的各個消費組均勻分配給各個Broker作為coordinator來進行管理的,consumer group中的每個consumer剛剛啟動就會跟選舉出來的這個consumer group對應的coordinator所在的broker進行通信,然后由coordinator分配分區給你的這個consumer來進行消費。coordinator會盡可能均勻的分配分區給各個consumer來消費。
如何選擇哪臺是coordinator
首先對消費組的groupId進行hash,接著對__consumer_offsets的分區數量取模,默認是50,可以通過offsets.topic.num.partitions來設置,找到你的這個consumer group的offset要提交到__consumer_offsets的哪個分區。比如說:groupId,“membership-consumer-group” -> hash值(數字)-> 對50取模 -> 就知道這個consumer group下的所有的消費者提交offset的時候是往哪個分區去提交offset,找到__consumer_offsets的一個分區,__consumer_offset的分區的副本數量默認來說1,只有一個leader,然后對這個分區找到對應的leader所在的broker,這個broker就是這個consumer group的coordinator了,consumer接著就會維護一個Socket連接跟這個Broker進行通信。
Rebalance策略
比如我們消費的一個主題有12個分區:
p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假設我們的消費者組里面有三個消費者
1.range策略
range策略就是按照partiton的序號范圍
p0~3 consumer1
p4~7 consumer2
p8~11 consumer3
默認就是這個策略;
2.round-robin策略
consumer1:0,3,6,9
consumer2:1,4,7,10
consumer3:2,5,8,11
但是前面的這兩個方案有個問題:
假設consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3
這樣的話,原本在consumer2上的的p6,p7分區就被分配到了 consumer3上。
3.sticky策略
最新的一個sticky策略,就是說盡可能保證在rebalance的時候,讓原本屬于這個consumer
的分區還是屬于他們,
然后把多余的分區再均勻分配過去,這樣盡可能維持原來的分區分配的策略
consumer1:0-3
consumer2: 4-7
consumer3: 8-11
假設consumer3掛了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11
Rebalance分代機制
在rebalance的時候,可能你本來消費了partition3的數據,結果有些數據消費了還沒提交offset,結果此時rebalance,把partition3分配給了另外一個cnosumer了,此時你如果提交partition3的數據的offset,能行嗎?必然不行,所以每次rebalance會觸發一次consumer group generation,分代,每次分代會加1,然后你提交上一個分代的offset是不行的,那個partiton可能已經不屬于你了,大家全部按照新的partiton分配方案重新消費數據。
Consumer核心參數
【heartbeat.interval.ms】
consumer心跳時間,必須得保持心跳才能知道consumer是否故障了,然后如果故障之后,就會通過心跳下發rebalance的指令給其他的consumer通知他們進行rebalance的操作
【session.timeout.ms】
kafka多長時間感知不到一個consumer就認為他故障了,默認是10秒
【max.poll.interval.ms】
如果在兩次poll操作之間,超過了這個時間,那么就會認為這個consume處理能力太弱了,會被踢出消費組,分區分配給別人去消費,一遍來說結合你自己的業務處理的性能來設置就可以了
【fetch.max.bytes】
獲取一條消息最大的字節數,一般建議設置大一些
【max.poll.records】
一次poll返回消息的最大條數,默認是500條
【connection.max.idle.ms】
consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立socket連接,這個建議設置為-1,不要去回收
【auto.offset.reset】
earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
topica -> partition0:1000
partitino1:2000
latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從當前位置開始消費
none
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
【enable.auto.commit】
這個就是開啟自動提交唯一
【auto.commit.ineterval.ms】
這個指的是多久條件一次偏移量
總結
以上是生活随笔為你收集整理的Kafka一些参数配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里京东滴滴等大厂面试题汇总
- 下一篇: TextRank、BM25算法提取关键字