Kafka 消费者组 Rebalance 详解
Rebalance作用
Rebalance 本質上是一種協議,主要作用是為了保證消費者組(Consumer Group)下的所有消費者(Consumer)消費的主體分區達成均衡。
比如:我們有10個分區,當我們有一個消費者時,該消費者消費10個分區,當我們增加一個消費者,理論上每個消費者消費5個分區,這個分配的過程我們成為Rebalance(重平衡)
?
觸發條件
常見的有三種情況會觸發Rebalance:
- 組成員數發生變更
- 訂閱主題數發生變更
- 訂閱主題的分區數發生變更
?
缺點
- Rebalance時所有消費者無法消費數據
- Rebalance速度慢
- Rebalance 效率不高
Coordinator(協調者)介紹
Consumer 端應用程序在提交位移時,其實是向 Coordinator 所在的 Broker 提交位移。同樣地,當 Consumer 應用啟動時,也是向 Coordinator 所在的 Broker 發送各種請求,然后由 Coordinator 負責執行消費者組的注冊、成員管理記錄等元數據管理操作。
所有 Broker 在啟動時,都會創建和開啟相應的 Coordinator 組件。也就是說,所有 Broker 都有各自的 Coordinator 組件。
?
如何避免 Rebalance
最簡單粗暴的就是 : 減少組成員數量發生變化
每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活著。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個參數,叫 session.timeout.ms,就是被用來表征此事的。該參數的默認值是 10 秒,即如果 Coordinator 在 10 秒之內沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經掛了。可以這么說,session.timout.ms 決定了 Consumer 存活性的時間間隔。
除了這個參數,Consumer 還提供了一個允許你控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標志封裝進心跳請求的響應體中。
除了以上兩個參數,Consumer 端還有一個參數,用于控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
?
觸發Rebalance時Broker服務端與Consumer客戶端日志
增加消費者時
當增加一個消費者進程時,broker server.log中GroupCoordinator 打印日志如下
[2020-03-28 23:03:59,453] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer in state PreparingRebalance with old generation 7 (__consumer_offsets-23) (reason: Adding new member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f with group instanceid None) (kafka.coordinator.group.GroupCoordinator) [2020-03-28 23:04:02,005] INFO [GroupCoordinator 0]: Stabilized group test-consumer generation 8 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator) [2020-03-28 23:04:02,008] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer for generation 8 (kafka.coordinator.group.GroupCoordinator)在Consumer客戶端Debug日志中有以下信息提示,說明已經該group產生了Rebalance
23:04:07,379 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer, sessionTimeout=10000, rebalanceTimeout=10000, memberId=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@442675e1)) to coordinator DESKTOP-I0EG1MJ.localdomain:9092 (id: 2147483647 rack: null) 23:04:07,379 DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JOIN_GROUP {group_id=test-consumer,session_timeout=10000,rebalance_timeout=10000,member_id=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} with correlation id 146 to node 2147483647減少消費者時
當減少一個消費者組的進程時,broker server.log中GroupCoordinator 打印日志如下
[2020-03-28 23:04:39,367] INFO [GroupCoordinator 0]: Member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f in group test-consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2020-03-28 23:04:39,367] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer in state PreparingRebalance with old generation 8 (__consumer_offsets-23) (reason: removing member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) [2020-03-28 23:04:43,765] INFO [GroupCoordinator 0]: Stabilized group test-consumer generation 9 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator) [2020-03-28 23:04:43,768] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer for generation 9 (kafka.coordinator.group.GroupCoordinator)在未停止的Consumer客戶端Debug日志中有以下信息提示,說明已經該group產生了Rebalance
23:04:56,507 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer, sessionTimeout=10000, rebalanceTimeout=10000, memberId=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@49e202ad)) to coordinator DESKTOP-I0EG1MJ.localdomain:9092 (id: 2147483647 rack: null) 23:24:56,507 DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JOIN_GROUP {group_id=test-consumer,session_timeout=10000,rebalance_timeout=10000,member_id=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} with correlation id 321 to node 2147483647注意:注意對于不同topic,使用相同consumer group,如果有一個消費者程序停止或新增,所有相同consumer group都會Rebalance
?
所以在我們日常開發中,不想干的業務也要避免Consumer Group 設置成不相同的
?
Rebalance 學習思維導圖
下面是我大概整理的Rebalance的知識點參考《Kafka核心技術與實戰》
?
?
總結
以上是生活随笔為你收集整理的Kafka 消费者组 Rebalance 详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Yarn 监控 - 监控任务运行状态 (
- 下一篇: 面向对象设计之CRC卡片