6.3.3 延迟缓存
副本管理器針對生產請求和l拉取請求都有一個全局的延遲緩存,生產請求對應延遲緩存中存儲了延遲的生產(DelayedProduce),拉取請求對應延遲緩存中存儲了延遲的拉取(DelayedFetch)。Kafka的延遲緩存數據結構(DelayedOperatlonPurgatory)和上一節的Purgatory類似。下面的代碼片段以延遲的生產和拉取為例,列舉了副本管理器中,與延遲緩存、延遲操作相關的方法:
延遲緩存除了管理延遲操作,還要從分區角度嘗試完成延遲的操作,延遲緩存主要有下面兩個方法。
- tryCompleteElseWatch()方法。嘗試完成延遲的操作,如果不能完成,將延遲操作加入延遲緩存中。一旦將延遲操作加入延遲緩存的監控,延遲操作的每個分區都會監視該延遲操作。
- checkAndComplete()方法。它的參數不是延遲操作對象,而是延遲緩存的鍵(分區)。外部事件調用該方法,根據指定的鍵(分區),嘗試完成延遲緩存中的延遲操作。
注意:本章分析的延遲生產和延遲拉取,在豆豆遲緩存中的鍵都是分區,但延遲緩存的鍵并不一定就是分區。比如上一幸延遲的加入和延遲的心跳,在延遲緩存中的鍵分別是消費組和消費者編號,而不是分區。
服務端創建的延遲操作有多個分區,在加入到延遲緩存時,每個分區都對應相同的延遲操作。服務端在剛創建延遲操作時,因為沒有滿足條件,所以才會創建延遲的操作。以6.3.2節“4.延遲生產的示例”為例,服務端處理生產請求,將消息集寫到分區[凹,凹,P3],并創建了延遲的生產。服務端將延遲的生產加入到延遲緩存中,正常的結果是[Pl->DelayedProduce,PZ->DelayedProduce,P3->DelayedProduce]。但如果在加入的過程中,延遲的生產滿足了條件,即3個分區的備份副本都同步了主副本的消息,那么服務端就不需要再監控這個延遲的操作了。比如服務端將Pl->DelayedProduce加入延遲緩再后,延遲的生產可以完成,那么剩下的[PZ->DelayedProduce,P3->DelayedProduce]就不會被1111入延遲緩存了。
延遲緩存的tryCoMpleteElseWatch()方法將延遲操作加入延遲緩存之前,會先嘗試一次完成延遲的操作。如果不能完成,才會調用watchForOperation()方法將延遲操作加入到分區對應的監視器(Watchers)。在這之后,還會再次嘗試一次完成延遲的操作,如果還不能完成,才會將延遲操作加入定時幫(Timer)。相關代碼如下:
延遲操作不僅存在于延遲緩存中,還會被定時器監控。延遲操作在延遲緩存中的生命周期分別與外部事件、定時持有關。下面兩點解釋了延遲操作在延遲緩存中的生命周期。
- 將延遲操作加入延遲緩存,目的是讓外部事件有機會嘗試完成延遲的操作。當滿足條件,可以完成延遲操作H才,服務端才會返回響應結果給客戶端,并將延遲操作從延遲緩存中刪除。
- 將延遲操作力[I入定時器,目的是在延遲操作超時后,服務端可以強制返回響應結果給客戶端。注意:延遲緩存的作用是:外部事件可以根據分區,嘗試完成監視器的所有延遲操作。定時器的作用是:在延遲操作超時后,強制完成延遲的操作。兩者都保存了延遲操作,但前者有分區,后者沒有分區。被定時器監控的延遲操作,并不需要分區,因為定時器與分區無關。
延遲緩’存的每個鍵都有一個監視器,它管理了鏈表結構的延遲操作。外部事件發生時,會給定一個鍵,然后i用用這個健對應監視器的tryCompleteWatched()方法,嘗試完成監視器中所有的延遲操作。監視器嘗i式完成所有延遲操作的過程中,會調用每個延遲操作的tryComplete()方法,判斷能否完成延遲的操作。如果某個延遲操作能夠完成,貝lj將對應的延遲操作從鏈表中移除。相關代碼如下:
6.3.2節提到,外部事件根據指定分區嘗試完成延遲的操作。如果延遲操作可以完成,只會從延遲緩存中刪除這個分區中已經完成的延遲操作,并不會刪除其他分區中已經完成的延遲操作。監視器的purgeC0111pleted()方法會清理所有已經完成的延遲操作,這個方法會被清理線程調用。
如圖6-64所示,以6.3.2節的延遲拉取為例,外部事件嘗試完成分區凹的延遲操作,可以完成DelayedFetch2和DelayedFetch4,它們會立即從延遲緩存中刪除。另外,定時的清理線程會檢查所有的監視器,在檢查到DelayedFetch2和DelayedFetch4時,才會將其從分區P2和分區內的監視器中移除。
下面對比了監視器嘗試完成延遲的操作、清理已完成的延遲操作兩個方法的不同點。
- 嘗試完成時會先判斷延遲操作是否已經完成,如果沒有,則調用每個延遲操作的tryComplete()方法。這兩者的返回值只要是true,就會刪除當前的延遲操作。
- 清理已完成的延遲操作,并不會調用延遲操作的tryComplete()方法,而是直接判斷延遲操作是否已經完成,如果是,則從監視器中刪除當前的延遲操作。
清理線程的作用是清理所有監視器中已經完成的延遲操作。它作為延遲緩存的內部類,需要訪問延遲緩存的watchersForKey成員變量才能正常地展開工作。另外,清理器每次運行時都會增加定時器的時鐘。下面列汁1了清理器與延遲緩存、定時器相關的代碼:
延遲緩存的tryCompleteElseWatch()方法在將延遲操作加入指定鍵的監視器后,會增加esti~時edTotalOperations計數器,并往定時器的延遲隊列中添加延遲的操作。清理線程的運行方法根據計數器的值減去定時器的大小(delayed變量),正常來看這個差距會等于零。
但實際上,清理器在運行時會先調用定時器的advanceClock()方法,將定時器的時鐘往前移動一次。定時器在運行時,如果延遲的操作超時了,就會將延遲操作從定時器的延遲隊列中移除。一旦延遲操作從定時器中刪除,定時器的大小就會減少,那么計數器減去定時器的大小就會大于零。最后,清理線程就會滿足“差距大于purgeinterval這個條件”,開始清理延遲緩存中所有的監視器。
Kafka服務端創建的延遲操作(DelayedOperation)會作為一個定時任務(TifilerTask),加入定時器(T"imer)的延遲隊列(DelayQueue)。當延遲操作超時后,定時器會將延遲操作從延遲隊列中彈:-H,并調用延遲操作的運行方法,強制完成延遲的操作。
定時器使用“延遲隊列”管理服務端創建的所有延遲操作,延遲隊列的每個元素是定時任務列表(TimerTaskli.st),一個定時任務列表可以存放多個定時任務條目(Ti.merTaskEntry)。服務端創建的延遲操作對象,會先包裝成定時任務條目,然后才會加入延遲隊列指定的一個定時任務列表。“延遲隊列”是定時器中保存“定時任務列表”的全局數據結構,但服務端創建的“延遲操作”不是直接加入“定時任務列表”,而是加入到“時間輪”(Ti.mi.ngWheel),延遲隊列和時間輪之間的關系如下。
列中。 (2)超時的定時任務列表會被延遲隊列的poll()方法彈:-H。定時任務列表超時并不一定表示定時任務超時,將定時任務重新加入時間輪,如果加入失敗,說明定時任務的確超時,通過錢程池執行任務。 (3)執行延遲操作對應的定時任務,只在定時器的addTi.merTaskEntry()方法f~I調用。所以在advanceClock()方法中將定時任務列表從延遲隊列中彈出后,調用定時任務列表的flush()方法將所有的定時任務重新加入時間輪,這樣才有機會執行超時的定時任務。 (4)延遲隊列的poll()方法只會彈出超時的定時任務列表,隊列中的每個元素按照超時時間排序,如果第一個定時任務列表都沒有過期,那么其他定時任務歹lj表也一定不會超時。假設調用advanceClock()方法時,第一次調用延遲隊列的poll()方法會彈II\一個超時的定時任務列表,第二次調用延遲隊列沒有參數的poll()方法沒有超時的定時任務列表,就不會再彈州定時任務列表了。定時器的相關代碼如下:
注意:延遲操作本身的失效時間(expirationMs)等于客戶端請求設直的延遲時間(delayMs)加上當前時間,它是一個絕對的時間戳。比如客戶端請求設置的延這時間是10秒,當前時間是2017-1-110:00:00,那么延遲操作的失效時間等于2017-1-110:00:10。Java的延遲隊列是一個基于時間的優先級隊列,延遲隊列的元素(即每個定時任務列表)都有一個失效時間,這個失效時間也是一個絕對的時間截。不過,定時任務列表在實現Delayed接- 的getDelay()方法,則妥將絕對的失效時間減去當前時間,表示定時任務列表在多長時間之后會過期。當getDelay()方法返回值小于等于零時,就表示定時任務列表已經過期,需妥立即衫t-1于。
時間輪類似于一個環形緩沖區,不同的是,加入環形緩沖區的數據只能順序加入,而加入時間輪的數據可以不按順序加入。并且,如果當前時間輪放不下加入的數據時,它會創建一個更高層的時間輪。第一層時間輪的tickMs=l表示一格的長度是l毫秒,wheelSize=20表示一共20格,它的范圍是20毫秒。第二層時間輪的tickMs=20表示一格的長度是20毫秒,它的范圍是400毫秒。如圖6-65所示,假設有5個定時任務,它們的超時時間分別是[8,8,25,3日,35]。前2個定時任務會加入到第一個時間輪的第八個桶,后3個定時任務會加入到第二個時間輪的第一個桶中。
定時器只持有第一層時間輪的引用,并不會持有其他更高層的時間輪。比如上面的示例中,第一層時間輪會持有第二層時間輪的引用,如果還有第三層時間輪,則第二層時間輪會持有第三層時間輪的引用定時器將定時任務加入當前時間輪,要判斷定時任務的失效時間是再在當前時間輪的范圍內。如果不在當前時間輪的范圍內,則要將定時任務上升到更高一層的時間輪中。相關代碼如下:
以前面5個定時任務為例來分析層級時間輪的工作方式。如圖6-66所示,當前時間為8毫秒時,第一層時間輪的buckets定時任務列表超時,會被延遲隊列彈出。在將定時任務列表中的定時任務重新加入第一層時間輪時,由于定時任務的失效時間小于當前時間加上tickMs=lr怡,所以加入失敗。
如圖6-67所示,當前時間為20毫秒時,第二層時間輪的bucketl定時任務列表超時,也會被延遲隊列彈:1:\。不同的是:在將定時任務列表中的定時任務重新加入第一層時間輪時,3個定時任務都還沒有失效。并且,它們都在第一層時間輪的范圍內,所以允許重新加入定時器的第一層時間輪中。
如圖6-68所示,最終第二層時間輪bucketl定時任務列表的3個定時任務都被降級后,加入到第一層時間輪3個不同的定時任務列表中,分別是[bucket5,bucket10,bucket10]。后續這3個定時任務的執行和圖6-66類似,一旦超時被延遲隊列彈出,再次加入定時器就會失敗,并且會立即執行定時任務,強制完成延遲的操作。
本節分析了延遲操作在延遲緩存和定時器中的生命周期,外部事件嘗試完成延遲緩存中的延遲操作,定時器會在延遲操作失效后強制完成延遲操作。清理器會定期地刪除延遲緩存中已經完成的延遲操作。
6.4 小結
本章主要分析了日志存儲、日志管理、副本管理器的具體實現。下面分別總結這3個知識點的一些要點。日志存儲會將消息集寫到底層的日志文件,它的主要概念有以下幾點。
- 一個日志(Log)有多個日志分段(LogSegment)。每個日志分段由數據文件(FileMessageSet)和索引文件(Offsetindex)組成。
- 偏移量是消息最重要的組成部分。每條消息寫入底層數據文件,者IS會有一個遞增的偏移量。
- 索引文件保存了消息偏移量到物理位置的映射關系,但并不是保存數據文件的所有消息,而是間隔一定數量的消息才保存一條映射關系。索引文件保存的偏移量是相對偏移鹽,數據文件中每條消息的偏移量是分區級別的絕對偏移量。
- 存儲索引文件的條目時,將絕對偏移量減去日志分段的基準偏移鹽。查詢索引文件返回的相對偏移量要加上基準偏移量,才能用于查詢數據文件。
- 客戶端每次讀取數據文件,服務端都會創建一個文件視圖,文件視圖和底層數據文件共用一個文件通道,但擁有不同的開始位置和結束位置。
- 服務端返回文件視圖給客戶端,采用零拷貝技術,將底層文件通道的數據直接傳輸到網絡通道。
日志管理器(LogMa_!1ager)管理了服務端的所有日志,除了上面對日志的追加和讀取操作外,日志管理還有下面幾個后臺管理的線程類。
- 定時將數據文件寫到磁盤上、定時將恢復點寫入檢查點文件。
- 日志清理線程根據日志的大小和時間清理最舊的日志分段。
- 日志壓縮線程將相同鍵的不同消息進行壓縮,壓縮線程將日志按照清理點分成頭部和尾部。
副本管理然(ReplicaManager)保存了服務端的所有分區,并處理客戶端發送的讀寫請求。
- 副本管理器處理讀寫請求,會先操作分區的主副本。appendMessages()方法會將消息集寫入主副本的本地臼志,fetchMessage()方法會從主副本的本地日志讀取消息集。
- 每個分區都有一個主副本和多個備份副本,只有本地副本才有日志對象。副本有兩個重要的位置信息:LEO表示副本的最新偏移量,HW表示副本的最高水位。
- 生產請求的應答值(acks)需要服務端創建延遲的生產(DelayedProduce),拉取請求的最少字節數(fetchMinBytes)需要服務端創建延遲的拉取(DelayedFetch)。
- 延遲緩存會記錄分區到延遲操作的映射關系,外部事件會根據分區嘗試完成延遲的操作。
- 延遲緩存有監視器、清理器、定時器協調完成延遲的操作。
在0.8版本以前,Kafka并沒有日志復制的特性,因此一旦消息代理節點掛掉,這個節點上的數據就會丟失。在0.8版本以后,Kafka提供了日志的副本機制,雖然只有主副本可以響應數據的讀寫請求,但是備份副本會向主副本中及時地同步數據。這樣,當主副本掛掉后,備份副本就可以選舉出新的主副本,并繼續響應客戶端的讀寫請求。下一章我們來分析服務端如何實現副本的復制特性。
總結
以上是生活随笔為你收集整理的6.3.3 延迟缓存的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VM技术(一)NES模拟器VM综述
- 下一篇: BetterAndBetter--Mac