DelayedOperationPurgatory之DelayedOperation pool
purgatory就是煉獄的意思。
當一個DelayedOperation需要被delay時,它就被放到DelayedOperationPurgatory,相當于進行一個等待池。上一篇blog提到過,DelayedOperation想要擺脫delay狀態,需要由事件來觸發對它狀態的檢查,或者是超時時間到了。
這個邏輯看起來挺簡單,但高效實現卻挺復雜。0.9.0版本的Kafka重新設計了這個purgatory(相比0.8版本),設計思路在Kafka的這篇文檔Purgatory Redesign Proposa。我翻譯并注釋了一下,放在Kafka之Purgatory Redesign Proposal (翻譯)
purgatory的實現使用了兩個緩存,這里先講第一個。
直觀的,我們需要一個key綁定到DelayedOperation上,來說明這個DelayedOperation會由哪些事件觸發,而且一個DelayedOperation可以綁定到多個key, 一個key也可能跟多個DelayedOperation有關。所以這是一個多對多的映射,一邊是事件,一邊是DelayedOperation。
DelayedOperation pool
DelayedOperationPurgatory就提供了這種功能。它把這種映射保存到一個Pool里。這個Pool實際上是對一個ConcurrentHashMap的封裝.
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
Pool構造器的參數是一個value factory,即當這個pool里一個key沒有value時,就用這個函數生成一個value。所以,對于value為空的key,這個pool就會構造一個watch這個key的Watchers.
Watchers
這個Pool的key是Any類型,也就是Scala里所有對象的基類,value是Watchers。注釋里這么描述Watchers的
A linked list of watched delayed operations based on some key
Watchers把這些DelayedOperation保存在自己的一個instance field里
private[this] val operations = new LinkedList[T]()
由于Watchers是DelayedOperationPurgatory的內部類,T就源于DelayedOperationPurgatory的簽名
class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
extends Logging with KafkaMetricsGroup
所以operations這個域里保存的就是一個DelayedOperation的列表。
所以,DelayedOperationPurgatory用來保存DelayedOperation的數據結構是一個Map[Any, List[DelayedOpertion]]的結構。
對于Watchers來說,有兩件事要做
當它關注的key有事件發生時,需要調用它的方法來遍歷operations,找出其中可以被complete(即不再被delay)的operation。這個方法就是tryCompleteWatched
由于一個DelayedOperation可以對應多個key,所以當這個Watchers對應的key沒有被觸發,它保存的operations里的元素仍然可能由于其它的key觸發而而被complete。所以外界需要能主動地檢測這個Watchers里的哪些operation已經被complete了,并且移除這些元素。這個方法就是purgeCompleted。
做這兩件事情的實機很重要,下面來分析一下
第一件事,在產生事件的地方進行檢測就好。比如fetch線程處理fetch請求的過程,以及produce request的處理過程中,可以調用tryCompleteWatched。
第二件事的處理實機比較不好確定。因為當把一個request從某個key的watchers中移除以后,它可能還在另一個key的watchers里。而每次移除一個request,都要調用purgeCompleted顯然不現實。但是0.9.0的實現中引用了新的數據結構來對request的超時進行檢測,通過它可以準確獲得某個時刻在purgatory中的請求數量(但并不是server中的DelayedOperation的數量,因為超時的DelayedOperation會被放入一個線程池執行它的回調,所以總的數量還需要加上線程池中的Operation數量, 而且這個線程池是一個FixedThreadPool,它使用一個無界的queue)。具體的作法請參照Kafka之Purgatory Redesign Proposal (翻譯)
總結
以上是生活随笔為你收集整理的DelayedOperationPurgatory之DelayedOperation pool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Qt for Android】Open
- 下一篇: 教你怎么屏蔽掉在移动端的宽带运营商的流量