Kafka之Purgatory Redesign Proposal (翻译)
Purgatory是Kafka server中處理請求時使用的一個重要的數據結構。正好研究ReplicaManager源碼的時候發現了這篇文章,順便翻譯下。由于這個proposal里的很多東西需要看源碼才能理解得比較清楚,但是代碼還是比較多的,所以先大概講一下其中的一些概念和原理,以便于閱讀接下來的文章。
1. purgatory是用于緩存一些 delayed request的。這些請求因為一些條件得不到滿足,所以需要先放到purgatory里,等到條件滿足了,再從里邊移出來。
2. 這些request得到滿足的條件分成兩種:(1)它需要業務類型的條件,比如fetch的最少byte數目等 ?(2)超時時限。這兩個條件需要對應兩個不同類型的緩存,第一個緩存是用一個hashmap實現的,key就是條件,value就是等待這個條件的所有請求的列表(就是文章中的watcher list,每個在等待這個key的請求就是一個watcher);第二個緩存是一個計時器,當request超時以后,它會主動complete這個請求。
3. 第一個hashmap里的key與request是多對多的關系,所以通過一個key找到一個request, 然后complete這個request以后,可以把這個request從對應這個key的watcher list里移除。但是這并不代表這個request就不在第一個緩存里了,因為它可能還在其它key的wather list里,而遍歷所有wathers lists是一個開銷很大的操作,所以不能每次移除一個元素,都要對這個hashmap檢測一遍。所以,需要周期性地清理這個hashmap,就是下面文章中提到的purge操作。0.8.x里的實現是根據當前watcher list總的大小來確定啥時候該purge,但是這個大小并不代表了第一個緩存中的請求的數量,更不代表已實成的請求的數量。而實際應該purge的是已完成的請求的數量。舊的方案對這個問題的處理很不好,所以耗費了很多CPU,也限制了purgatory的吞吐量。新的方案部分解決了這個問題,至少比0.8.x的好很多。
4. 第二個緩存,即超時隊列里的元素即使被刪除了,也不能直接找到第一個緩存里的對應條目進行刪除。所以已經過期的請求也不能及時被從第一個緩存里移除,這也加到對一個緩存清理的必要性。
5. 0.8.x的計時器的實現是用了一個java.util.concurrent.DelayQueue,把每個request做成一個DelayItem放進去。java的DelayQueue的實現是用的一個優先級隊列,這個隊列的入隊和刪除的時間復雜度是O(logn)。所以,如果DelayQueue很大,那么每次入隊和刪除的開銷都會比較高。而新的實現通過一個timing wheel和基于雙端鏈表的桶的實現,把插入和刪除請求到計時器的操作的時間復雜度降到了O(1),這也降低了對CPU的使用。
Purgatory Redesign Proposal
Introduction
簡介
Kafka implements several request types that cannot immediately be answered with a response. Examples:
- A produce request with acks=all cannot be considered complete until all replicas have acknowledged the write and we can guarantee it will not be lost if the leader fails.
- A fetch request with min.bytes=1 won't be answered until there is at least one new byte of data for the consumer to consume. This allows a "long poll" so that the consumer need not busy wait checking for new data to arrive.
Kafka實現了好幾種不能被立即響應的請求類型, 比如:
- 一個ack=all的produce request在所有副本都確認寫入之前是不能被認為已經完成了的,因為我們不能保證如果leader掛掉的話它不會丟失。
- 一個min.bytes=1的fetch request在至少有1bytes的數據可以被消費者消費之前,是不能給出回應的。這使得“長時抓取”可以實現,這樣consumer就不用頻率檢查是否有新的數據到來。
These requests are considered complete when either (a) the criteria they requested is complete or (b) some timeout occurs.
一個請求只有符合以下任一條件時才會被認是已經完成了(a)它們需要條件得到了滿足(b)發生了超時
We intend to expand the use of this delayed request facility for various additional purposes including partition assignment and potentially quota enforcement.
The number of these asynchronous operations in flight at any time scales with the number of connections, which for Kafka is often tens of thousands.
A naive implementation of these would simply block the thread on the criteria, but this would not scale to the high number of in flight requests Kafka has.
我們準備把delayed request庫用于其它的一些目的,比如分區分配(partition assignment)以及可能用于配額控制(quota enforcement)功能。
在任何時刻正在執行中的這種異步操作的數量跟連接(connections)的數量一起增長,對于Kafka來說這種連接經常是萬級別的。(譯注:是說隨著連接數的增加,正在執行的這種異步操作的數量也會增加)。
對于這種問題的一個簡單的實現方案是把線程阻塞在請求完成的條件上,但是對于Kafka這種擁有非常多的請求(指前邊提到的這種delayed request)的情況,這種解決方案不具有擴展性。
The current approach uses a data structure called the "request purgatory". The purgatory holds any request that hasn't yet met its criteria to succeed but also hasn't yet resulted in an error. This structure holds onto these uncompleted requests and allows non-blocking event-based generation of the responses. This approach is obviously better than having a thread per in-flight request but our implementation of the data structure that accomplishes this has a number of deficiencies. The goal of this proposal is to improve the efficiency of this data structure.
當前Kafka的做法是使用一個叫做“request purgatory"的數據結構。這個purgatory持有還沒有達到完成條件但也沒有發生錯誤的請求。這個數據結構持有這些未完成的請求,并且允許以"非阻塞"的"事件驅動"的方式生成響應。這種做法很明顯比為每個正在等待的請求創建一個線程好得多,但是我們對于這個數據結構的實現有一些缺陷。這個提議(proposal)的目的。
Current Design
當前的設計
The?request purgatory?consists of a timeout timer and a hash map of watcher?lists for event driven processing. A request is put into a purgatory when it is not immediately satisfiable because of unmet conditions. A request in the purgatory is completed later when the conditions are met or is forced to be completed (timeout) when it passed beyond the time specified in the timeout parameter of the request. Currently (0.8.x) it uses Java?DelayQueue?to implement the timer.
當前的request purgatory?包括一個超時計時器以及一個以watchers列表為value的哈希表,這個哈希表用于事件驅動的處理。當一個請求不能立即滿足時,它就被放到一個purgatory。對于一個在purgatory中的請求,當它的需求被滿足或者它因為超過了這個請求中指定的超時時限而被強制完成的時候,它就會被完成。當前的版本(0.8.x)使用一個Java的DelayedQueue來實現這個計時器。
?
When a request is completed, the request is not deleted from the timer or watcher lists immediately. Instead, completed requests are deleted as they were found during condition checking. When the deletion does not keep up, the server may exhaust JVM heap and cause?OutOfMemoryError. To alleviate the situation, the reaper thread purges completed requests from the purgatory when the number of requests in the purgatory (including both pending or completed requests) exceeds the configured number. The purge operation scans the timer queue and all watcher?lists to find completed requests and deletes them.
當一個請求完成以后,它沒有被立即從計時器或者watchers列表中刪除。而取代之后的是,已經完成的請求只有在條件檢查的時候被發現后,才會被刪除。當刪除的速度跟不的時候,服務器可能會耗盡JVM堆,引發OutOfMemoryError。為了避免這種情況,在purgatory中的請求數目(包括在等待的以及完成的請求)達到一個指定的值時,收割者線程就會把已經完成的請求從purgatory里清理。清理操作會掃描計時器隊列以及所有watcher列表來找到已經完成的請求,然后刪除它們。
By setting this configuration parameter low, the server can virtually avoid the memory problem. However, the server must pay a significant performance penalty if it scans all lists too frequently.
?通過把這個配置參數設低,服務器可以差不多避免內存問題。但是,如果服務器掃描這些列表太頻繁的話,會遭受顯著的性能懲罰。
New Design
The goal of the new design is to allow immediate deletion of a completed request and reduce the load of expensive purge process significantly. It requires cross referencing of entries in the timer and the requests. Also it is strongly desired to have O(1) insert/delete cost since insert/delete operation happens for each request/completion.
To satisfy these requirements, we propose a new purgatory implementation based on?Hierarchical Timing Wheels.
新設計
新設計的目標是允許把已完成的任務立即刪除,并且顯著減輕清理線程的負載。這需要對計時器的條目(entries in the timer)和請求進行交叉引用。并且,對于插入和刪除的復雜度為O(1)存在著強烈的需求,因為對于生個請求/完成都會有插入/刪除的操作。
為了實現上面的要求,我們提議一個基于Hierarchical Timing Wheels?的新purgatory的實現。
Hierarchical Timing Wheel
層級形式的時間輪
A simple timing wheel is a circular list of buckets of timer tasks. Let?u?be the time unit. A timing wheel with size?n?has?n?buckets and can hold timer tasks in?n * u?time interval. Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, the first bucket holds tasks for?[0, u), the second bucket holds tasks for?[u, 2u), …, the n-th bucket for?[u * (n -1), u * n). Every interval of time unit?u, the timer ticks and moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task into the bucket for the current time since it is already expired. The timer immediately runs the expired task. The emptied bucket is then available for the next round, so if the current bucket is for the time?t, it becomes the bucket for?[t + u * n, t + (n + 1) * u)?after a tick.?A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue based timers, such as?java.util.concurrent.DelayQueue?and?java.util.Timer, have O(log n) insert/delete cost. 一個簡單的時間輪是一個定時任務(timer task)的循環鏈表。設u是時間單元。一個大小為n的時間輪有n個桶,因此可以持有n * u時間間隔的計時任務。生個桶持有落入相關時間段的計時任務。在開始的時候,第一個桶持有[0, u)的任務,第二個桶持有[u, 2u),的任務,..., 第n個桶持有[u * (n -1), u * n)的任務。每個u的時間間隔,計時器走一格,并且移動到下個桶,因此使得所有計時任務過期(expire all timer tasks。譯注:是指剛走過的那一格里的所有任務過期)。因此,計時器從不把任務加到當前時間的桶里,因為它已經過期了(譯注:是指如果一個任務的到期時間是在當前時間的桶里,計時器就不會把它進去,因為這個任務已經被認為是過期的了。這個是對當前的桶的含義的一個說明)。計時器會立即運行過期的任務。這個空的桶在下一輪的時候就可以被使用了,所以如果當前桶標識t時間,那么在計時器走一格過后,它就變成了[t + u * n, t + (n + 1) * u)的桶。時間輪對于插入/刪除操作(啟動計時器/停止計時器)有O(1)的時間復雜度,而之前的基于優先隊列的計時器,例如java.util.concurrent.DelayedQueue和java.util.Timer對于插入/刪除操作有O(logn)的開銷。 A major drawback of a simple timing wheel is that it assumes that a timer request is within the time interval of?n * u?from the current time. If a timer request is out of this interval, it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically organized timing wheels. The lowest level has the finest time resolution. As moving up the hierarchy, time resolutions become coarser. If the resolution of a wheel at one level is?u?and the size is?n, the resolution of the next level should be?n * u. ?At each level?overflows are delegated to the wheel in one level higher.?When the wheel in the higher level ticks, it reinsert timer tasks to the lower level. An?overflow wheel can be created on-demand. When a bucket in an?overflow bucket expires, all tasks in it are reinserted into the timer recursively. The tasks are then moved to?the finer grain wheels or be executed.?The insert (start-timer) cost is O(m) where?m?is the number of wheels, which is usually very small compared to the number of requests in the system, and the delete (stop-timer) cost is still O(1). 簡單的計時輪的一個主要缺點是它假設一個計時請求在從當前時間到n * u?的時間段內。如果一個計時請求在這個時間段以外,它就會溢出。一個層級結構的時間輪可以處理這種溢出。最低的層級有最細的時間粒度。隨著層級的上升,時間粒度變得更粗。如果時間輪在一個層級的粒度為 ?u?并且大小為n,下一個級別的粒度就應該是n * u。每個級別的溢出被代理給上一個級別。當高級別的時間輪走動一格,它把計時任務重新插入到比它低一級的級別。一個溢出輪(overflow wheel)可以按需創建。當一個溢出輪的桶超時的時候,這個桶的所有任務會被重新加入計時器。這些任務會被移動到合適粒度的輪或者被執行。插入(啟動計時器)的時間復雜度是O(m),m?是輪的總數,通常會比系統中請求的總數小得多,刪除(停止計時)的開銷仍然是O(1)。Doubly Linked List for Buckets in Timing Wheels
時間輪中用于桶的雙端鏈表
(譯注:意思是Timing Wheels中的桶是用雙端鏈表實現的)
In this design, we propose to use our own implementation of doubly linked list for the buckets in a timing wheel. The advantage of doubly linked list that it allows O(1) insert/delete of a list item if we have access link cells in a list. A timer task saves a link cell in itself when enqueued to a timer queue. When a task is completed or canceled, the list is updated using the link cell saved in the task itself. 在這個設計中,我們提出了一種用雙端鏈表來實現時間輪中的桶的方案。雙端鏈表的好處是如果我們可以訪問表中的元素,就可以O(1)的時間復雜度來插入/刪除它(譯注:意思是,如果我們有對這個list的entry的引用,那么雙端鏈表可以實現O(1)的插入和刪除操作,這里主要是跟單向鏈表比較)。 當把一個定時任務(timer task)加入到定時隊列(timer queue)的時候,這個定時任務保存了對它所在的鏈表元素的引用。當這個任務被完成或者取消,這個timer queue就會被用這個保存在timer task中的鏈接更新。 (譯注:這一段是講怎么用雙端鏈表實現O(1)的插入和刪除。其實雙端鏈表的優勢主要是刪除比較方便。每個timer task在放入timer queue的時候,也就被加入了一個雙端鏈表,這個timer task會保存一個到鏈表中它所在的entry的引用。這引,當通過event觸發找到了一個timer task,把它完成,就可以通過之前的引用找到它在鏈表中的元素,從而以O(1)的復雜度把它從鏈表移除。)Driving Clock using DelayQueue
使用DelayQueue驅動時鐘
A simple implementation may use a thread that wakes up every unit time and do the ticking, which checks if there is any task in the bucket. This can be wasteful if requests are sparse. We want the thread to wake up only when when there is a non-empty bucket to expire. We will do so by using?java.util.concurrent.DelayQueue?similarly to the current implementation, but we will enqueue task buckets instead of individual tasks. This design has a performance advantage. The number of items in?DelayQueue?is capped by the number of buckets, which is usually much smaller than the number of tasks, thus the number of offer/poll operations to the priority queue inside DelayQueue will be significantly smaller.
一個簡單的實現是使用一個線程,這個線程每個時間單位被喚醒,然后驅動時鐘走一格,來檢查是否在這個桶里邊有任務。如果請求很稀疏,那么這樣做挺浪費。我們想要的是只有在一個非空的桶要過期的時候,這個線程才會醒來。我們想要使用java.util.concurrent.DelayedQueue,很像當前的實現,但是我們加入queue的不是單個的任務而是桶。這樣的設計在性能上有優勢。DelayQueue里的元素的數量的上限就是桶的ovtr量,通常比起任務的數量,桶的數量要小得多,因此對于DelayQueue里的優先級隊列的offer/poll操作會明顯得小得多。Purge of Watcher Lists
清理觀察者列表
In the current implementation, the purge operation of watcher lists is triggered by the total size if the watcher lists. The problem is that the watcher lists may exceed the threshold even when there isn't many requests to purge. When this happens it increases the CPU load a lot. Ideally, the purge operation should be triggered by the number of completed requests the watcher lists.
在當前的實現中,對于watchers list的清理是被watchers list的大小觸發。問題是,即使沒有什么任務需要清理,watcher list的大小也可能會超過這個閥值。當這種情況發生,CPU負載就會增加很多。理想的情況是,清理操作是被watchers list中已經完成的請求的數目觸發。
In the new design,?a completed request is removed from the timer queue immediately with O(1) cost. It means that the number of requests in the timer queue is the number of pending requests exactly at any time. So, if we know the total number of distinct requests in the purgatory, which includes the sum of the number of pending request and the numbers completed but still watched requests, we can avoid unnecessary purge operations. It is not trivial to keep track of the exact number of distinct requests in the purgatory because a request may or my not be watched. In the new design, we estimate the total number of requests in the purgatory rather than trying to maintain the exactly number.
在新的設計中,一個已經完成的請求會被以O(1)的開銷從計時器隊列(timer queue)中被刪除。這意味著計時器隊列的請求的數目在任何時間點就是在等待的請求的數目。因此,如果我們知道這個purgatory中的不同請求類型請求的總數,也就是所有在等待的請求的總數以及雖然已經完成了但還在watchers lists里的請求數目,我們就可以避免沒必要的清理操作。追蹤purgatory中不同請求的確切數目不是一個簡單的事,因為一個請求可能被watch,也可能沒有。在這個新設計中,我們對purgatory中的請求的總數進行估計而不是試圖維護一個確切的值。
?
The estimated number of requests are maintained as follows. The estimated total number of requests,?E, is incremented whenever a new request is watched.?Before starting the purge operation, we reset the estimated total number of requests to the size of timer queue. If no requests are added to the purgatory during purge,?E?is the correct number of requests after purge.?If some requests are added to the purgatory during purge,?E?is incremented to?E + the number of newly watched requests.?This may be an overestimation because it is possible that some of the new requests are completed and remove from the watcher lists during the purge operation. We expect the chance of overestimation and an amount of overestimation are small.
?
請求總數的估計值被以下面的方式維護。請求總數的估計值, E?,每當一個新的請求被watch就會加1. 在開始清理操作之前,我們把請求總數的估計值重置為timer queue的大小。如果在清理過程中沒有新的請求被加到purgatory,E就是清理之后留下來的消息的總數。如果在清理過程中有新的請求被加到了purgatory, E就增加到了E +?新被watch的請求數量。這可能會是一個高估了的值因為在清理操作中可能會有新的請求被完成并且從watcher list里移除。我們希望高估的概率以及被高估的數目會比較小。Parameters
參數
- the tick size (the minimum time unit)
- the wheel size (the number of buckets per wheel)
- 一格的大小(也就是最小的時間單位)
- 輪的大小(每個輪的桶的數量)
BenchMark
We compared the enqueue performance of two purgatory implementation, the current implementation and the proposed new implementation. This is a micro benchmark. It measures the purgatory enqueue performance. The purgatory was separated from the rest of the system and also uses a fake request which does nothing useful. So, the throughput of the purgatory in a real system may be much lower than the number shown by the test.
我們比較了這兩種purgatory實現的入隊列(enqueue)性能,當前的實現和被提議的新的實現。這是一個小的benchmark。它度量的purgatory的入隊列性能。purgatory被從系統的其它部分剝離出來,并且使用了一個捏造的請求(fake request), 這個請求啥都不做。所以實際系統中這個purgatory的吞吐量會比benchmark里顯示的值低很多。
In the test, the intervals of the requests are assumed to follow the exponential distribution. Each request takes a time drawn from a log-normal distribution. By adjusting the shape of the log-normal distribution, we can test different timeout rate.
在測試里,請求的間隔被推測為符合指數分布(follow the exponential distribution). 每個請求的時間(譯注:這里應該是完成請求花費的時間,也就是從進入purgatory到complete的時間)取自一個對數正態分布(log-normal distribution)。通過調整這個對數正態分布的形狀,我們可以測試不同的超時比率(timeout rate)。
The tick size is 1ms and the wheel size is 20. The timeout was set to 200ms. The data size of a request was 100 bytes. For a low timeout rate case, we chose 75percentile = 60ms and 50percentile = 20. And for a high timeout rate case,?we chose 75percentile = 400ms and 50percentile = 200ms. Total 1 million requests are enqueued in each run.
一個格(tick size)是一毫秒,輪的大小是20。超時時間是200ms。每個請求的數據大小是100字節。對于低超時比率的情況,我們選擇百分位數為75的請求的完成時間是60ms, 百分位數50的完成時間是20ms(we chose 75percentile = 60ms and 50percentile = 20)。對于高超時比率的情況,我們選擇75percentile = 400ms以及50percentile = 200ms。每一輪中總共有100萬個請求被加入隊列。
Requests are actively completed by a separate thread. Requests that are supposed to be completed before timeout are enqueued to another DelayQueue. And a separate thread keeps polling and completes them. There is no guarantee of accuracy in terms of actual completion time.
請求被不斷的用另一個線程完成。應該在超時之前完成的請求被加入到另一個DelayQueue, 一個單獨的線程不斷地從這個隊列里poll請求并且完成它們。并沒有對請求實際完成的時間有準確地保證。(譯注:這一段是講在benchmark中是怎么樣完成(complete)這些請求的, 即用不會timeout的請求被放到一個DelayQueue里,然后有一個程線不停地從里邊拉取請求,然后完成它們。但是前邊講過DelayQueue的poll的時間復雜度為O(logn),所以這種方式本身會不會增加cpu load呢?尤其考慮到實際complete請求的時候,請求是從hashmap里獲取的,時間復雜度要低很多。)
The JVM heap size is set to 200m to reproduce a memory tight situation.
JVM的堆大小被設成200m來模擬一個內存緊張的場景。
The result shows a dramatic difference in a high enqueue rate area. As the target rate increases, both implementations keep up with the requests initially. However, in low timeout scenario the old implementation was saturated around 40000 RPS (request per second), whereas the proposed implementation didn't show any significant performance degradation, and in high timeout scenario the old implementation was saturated around 25000 RPS, whereas the proposed implementation was saturated 105000 RPS?in this benchmark.
結果顯示在高入隊率的情況下這兩種實現有巨大的差異。隨著目標速率的增加,兩種實現在開始時都能跟得上請求被始化的速度。但是,在低超時率的場景下,舊的實現在大概40000RPS(每秒請求數 request per second)下達到飽合,但是新提出來的實現方案并沒有顯示出任何明顯的性能下降。而且,在高超時率的場景下,舊的實現在2500RPS時就飽合了,但是新提出的這種實現在105000RPS時候才達到了飽合。(譯注:看來減少了插入和刪除timer task的時間復雜度,加上更高效的purge,使得請求進出purgatory的性能大大提高了。但是不是所有的request都是會被delay的,而且delayed operation中也有是來自于其它副本的replica請求,所以并不代表Kafka Server的RPS)。
CPU usage is significantly better in the new implementation.
新的實現在CPU使用率上明顯要好。
Finally, we measured total GC time (milliseconds) for ParNew collection and CMS collection. There isn't much difference in the old implementation and?the new implementation?in the region of enqueue rate that the old implementation can sustain.
最后,我們測量了用ParNew收集器和CMS收集器時的GC時間(譯注:新生代用ParNew, 老年代用CMS)。在舊的實現可以承受的入隊列速度的情況下,兩種實現并沒有什么區別。
Summary
?
In the new design, we use Hierarchical Timing Wheels for the timeout timer?and DelayQueue of timer buckets to advance the clock on demand.?Completed requests are removed from the timer queue immediately with O(1) cost. The buckets remain in the delay queue, however, the number of buckets is bounded. And, in a healthy system,?most of the requests are satisfied before timeout, and many of the buckets become empty before pulled out of the delay queue. Thus, the timer should rarely have the buckets of the lower interval. The advantage of this design is?that?the number of requests in the timer queue is the number of pending requests exactly at any time. This allows us to estimate the number of requests need to be purged. We can avoid unnecessary purge operation of the watcher lists. As the result we achieve a higher scalability in terms of request rate with much better CPU?usage.
在新的設計中,我們使用了層級狀時間輪(Hierarchical Timing Wheels)來做超時計時器,并且使用以計時桶(timer bucket)為元素的DelayQueue來按需驅動時鐘。已經完成的請求被以O(1)的開銷從時間隊列里移除。桶仍然在delay queue里,但是桶的總數是有限的。并且,在一個健康的系統里,大多數請求都在超時之前被完成了,并且大多數桶在被從delay queue里pull出來之前就已經變空了。因此,計時器并不會很頻繁地獲取桶。這種設計的一個優勢在于在任何時刻,在timer queue里的請求的總數就是當前在等待的請求的總數。這使得我們可以估計需要被清理的請求數量。我們因此可以避免不必要的對watcher list的清理動作。結果,我們實現了對于請求數量的更好的擴展性以及更好的CPU使用率。轉載于:https://www.cnblogs.com/devos/p/5060271.html
總結
以上是生活随笔為你收集整理的Kafka之Purgatory Redesign Proposal (翻译)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 策略模式(stragegy)
- 下一篇: (王道408考研操作系统)第四章文件管理