Kafka Producer 发送消息源码阅读
2019獨角獸企業重金招聘Python工程師標準>>>
今天看了kafka 發送消息部分的源碼(0.8.2.1版本的),針對kafka的消息發送,分區策略如下:
1 kafka的分區策略
?1.1 如果指定了partition,則將消息發到對應的partition
?1.2 如果沒有指定partition,但指定了key, 會根據key的hash選擇一個partition, ?
? ?如果如果key名固定,則消息只會發到固定的一個partition上, 所以key不要設置為固定的值,如果需要設置,則需要考慮修改kafka的源碼,以支持將數據均勻發到不同的partition上
1.3 如果key,partition都沒有指定,則采用round-robin即輪循的方式發到每個partition
2 消息的發送都是異步的,發送過程如下
涉及到三個對象:
2.1?RecordAccumulator
維護了一個ConcurrentMap<TopicPartition,?Deque<RecordBatch>>?batches 對象
一個partition對應一個RecordBatch的ArrayDeque? ?
調用KafkaProducer.send方法發送消息,最終調用如下方法:
? ? ? ? ? ?
如果RecordBatch已經滿 或 創建了新的RecordBatch,則喚醒發送對象Sender
???? ? ? ? ? ? ??
2.2?Sender
?The?background?thread?that?handles?the?sending?of?produce?requests?to?the?Kafka?cluster
Sender通過kafkaclient將RecordAccumulator 的數據批量寫入到server? ??
Sender定義的run方法實現如下:
? ?
在run(long now)中,實現邏輯如下:
2.2.1 首先通過如下條件獲取發送數據的節點?
2.2.2刪除掉當前不能發送的kafka node
? ??? ??? ??? ?? ? ? ? ? ?
2.2.3 獲取發送的數據列表
? ? 循環此節點上是leader的partition
? ? ? ? ? 根據partition,獲取此partition對應的RecordBatch,并放到此節點對應的?List<RecordBatch>
? ??? ??? ??? ??? ??? ??? ???? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
2.2.4組裝請求對象,發送到不同的kafka節點
計算pollTimeout并發送請求對象到不同的kafka節點
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);2.2.5 針對返回的數據進行處理
// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);for (ClientResponse response : responses) {if (response.wasDisconnected())handleDisconnect(response, now);elsehandleResponse(response, now);}2.3?KafkaClient
其實現類是:NetworkClient,基于socket方式與server進行數據交互
3 kafka參數配置
用于存儲批量數據的緩沖大小(對應類:MemoryRecords)?batch-size :?16384
用于整個client緩存所有發送對象的大小(對應類:BufferPool ) :BUFFER_MEMORY ?32?*?1024?*?1024L 即 32M
用于發送延遲的時間配置(LINGER_MS),如果設置為1秒,則記錄先發送到client緩存中,等待1秒后再發送數據,默認為0 表示立即發送
指定數據壓縮類型:?compression.type ,支持:none,gzip, snappy, lz4, 默認為none
理論上,設置LINGER_MS 會提高消息的吞吐量
轉載于:https://my.oschina.net/cloudcoder/blog/917309
總結
以上是生活随笔為你收集整理的Kafka Producer 发送消息源码阅读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 介绍两款API管理工具
- 下一篇: cmder 基本配置和使用