带你了解下Kafka的客户端缓冲池技术
最近看kafka源碼,著實被它的客戶端緩沖池技術優雅到了。忍不住要寫篇文章贊美一下(哈哈)。
注:本文用到的源碼來自kafka2.2.2版本。
背景
當我們應用程序調用kafka客戶端 producer發送消息的時候,在kafka客戶端內部,會把屬于同一個topic分區的消息先匯總起來,形成一個batch。真正發往kafka服務器的消息都是以batch為單位的。如下圖所示:
這么做的好處顯而易見??蛻舳撕头斩送ㄟ^網絡通信,這樣批量發送可以減少網絡帶來的性能開銷,提高吞吐量。
這個Batch的管理就非常值得探討了??赡苡腥藭f,這不簡單嗎?用的時候分配一個塊內存,發送完了釋放不就行了嗎。
kafka是用java語言編寫的(新版本大部分都是用java實現的了),用上面的方案就是使用的時候new一個空間然后賦值給一個引用,釋放的時候把引用置為null等JVM GC處理就可以了。
看起來似乎也沒啥問題。但是在并發量比較高的時候就會頻繁的進行GC。我們都知道GC的時候有個stop the world,盡管最新的GC技術這個時間已經非常短,依然有可能成為生產環境的性能瓶頸。
kafka的設計者當然能考慮到這一層。下面我們就來學習下kafka是如何對batch進行管理的。
緩沖池技術原理解析
kafka客戶端使用了緩沖池的概念,預先分配好真實的內存塊,放在池子里。
每個batch其實都對應了緩沖池中的一個內存空間,發送完消息之后,batch不再使用了,就把內存塊歸還給緩沖池。
聽起來是不是很耳熟啊?不錯,數據庫連接池,線程池等池化技術其實差不多都是這樣的原理。通過池化技術降低創建和銷毀帶來的開銷,提升執行效率。
代碼是最好的文檔,,下面我們就來擼下源碼。
我們擼代碼的步驟采用的是從上往下的原則,先帶你看看緩沖池在哪里使用,然后再深入到緩存池內部深入分析。
下面的代碼做了一些刪減,值保留了跟本文相關的部分便于分析。
public class KafkaProducer<K, V> implements Producer<K, V> {private final Logger log;private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);private static final String JMX_PREFIX = "kafka.producer";public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";@Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);...}當我們調用客戶端的發送消息的時候,底層會調用doSend,然后里面使用一個記錄累計器RecordAccumulator把消息append進來。我們繼續往下看看,
public final class RecordAccumulator {private final Logger log;private volatile boolean closed;private final AtomicInteger flushesInProgress;private final AtomicInteger appendsInProgress;private final int batchSize;private final CompressionType compression;private final int lingerMs;private final long retryBackoffMs;private final int deliveryTimeoutMs;private final BufferPool free;private final Time time;private final ApiVersions apiVersions;private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;private final IncompleteBatches incomplete;// The following variables are only accessed by the sender thread, so we don't need to protect them.private final Map<TopicPartition, Long> muted;private int drainIndex;private final TransactionManager transactionManager;private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer = null;buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);...RecordAccumulator其實就是管理一個batch隊列,我們看到append方法實現其實是調用BufferPool的free方法申請(allocate)了一塊內存空間(ByteBuffer), 然后把這個內存空空間包裝成batch添加到隊列后面。
當消息發送完成不在使用batch的時候,RecordAccumulator會調用deallocate方法歸還內存,內部其實是調用BufferPool的deallocate方法。
public void deallocate(ProducerBatch batch) {incomplete.remove(batch);// Only deallocate the batch if it is not a split batch because split batch are allocated outside the// buffer pool.if (!batch.isSplitBatch())free.deallocate(batch.buffer(), batch.initialCapacity());}很明顯,BufferPool就是緩沖池管理的類,也是我們今天要討論的重點。我們先來看看分配內存塊的方法。
public class BufferPool {static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";private final long totalMemory;private final int poolableSize;private final ReentrantLock lock;private final Deque<ByteBuffer> free;private final Deque<Condition> waiters;/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */private long nonPooledAvailableMemory;private final Metrics metrics;private final Time time;private final Sensor waitTime;public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {if (size > this.totalMemory)throw new IllegalArgumentException("Attempt to allocate " + size+ " bytes, but there is a hard limit of "+ this.totalMemory+ " on memory allocations.");ByteBuffer buffer = null;this.lock.lock();try {// check if we have a free buffer of the right size pooledif (size == poolableSize && !this.free.isEmpty())return this.free.pollFirst();// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = freeSize() * this.poolableSize;if (this.nonPooledAvailableMemory + freeListSize >= size) {// we have enough unallocated or pooled memory to immediately// satisfy the request, but need to allocate the bufferfreeUp(size);this.nonPooledAvailableMemory -= size;} else {// we are out of memory and will have to blockint accumulated = 0;Condition moreMemory = this.lock.newCondition();try {long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);// loop over and over until we have a buffer or have reserved// enough memory to allocate onewhile (accumulated < size) {long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} finally {long endWaitNs = time.nanoseconds();timeNs = Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if (waitingTimeElapsed) {throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");}remainingTimeToBlockNs -= timeNs;// check if we can satisfy this request from the free list,// otherwise allocate memoryif (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// just grab a buffer from the free listbuffer = this.free.pollFirst();accumulated = size;} else {// we'll need to allocate memory, but we may only get// part of what we need on this iterationfreeUp(size - accumulated);int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);this.nonPooledAvailableMemory -= got;accumulated += got;}...首先整個方法是加鎖操作的,所以支持并發分配內存。
邏輯是這樣的,當申請的內存大小等于poolableSize,則從緩存池中獲取。這個poolableSize可以理解成是緩沖池的頁大小,作為緩沖池分配的基本單位。從緩存池獲取其實就是從ByteBuffer隊列取出一個元素返回。
如果申請的內存不等于特定的數值,則向非緩存池申請。同時會從緩沖池中取一些內存并入到非緩沖池中。這個nonPooledAvailableMemory指的就是非緩沖池的可用內存大小。非緩沖池分配內存,其實就是調用ByteBuffer.allocat分配真實的JVM內存。
緩存池的內存一般都很少回收。而非緩存池的內存是使用后丟棄,然后等待GC回收。
繼續來看看batch釋放的代碼,
public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.nonPooledAvailableMemory += size;}Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();}}很簡單,也是分為兩種情況。要么直接歸還緩沖池,要么就是更新非緩沖池部分的可以內存。然后通知等待隊列里的第一個元素。
總結
以上是生活随笔為你收集整理的带你了解下Kafka的客户端缓冲池技术的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 查看php的错误日志文件,php查看错误
- 下一篇: Newton迭代法求解Toeplitz矩