kafka 延迟队列
最近在看Kafka延遲隊列的實現方式,發現大部分講的都很片面,都是時間輪相關的東西,搞得一知半解的,最終根據自己的理解,設計了一套延遲隊列,和大家一起討論一下,服務流程如下
?如圖所示,所有的消息進來之后,都會被分配到delay隊列中,然后delay隊列消費消息滿足時間要求后再發送到業務隊列中,這樣做的目的是避免消息阻塞,如果我們沒有delay隊列,所有消息都在業務隊列中,那必然會產生一定的堆積,因為這個隊列本身要做的事情太多,delay隊列就是為了分擔他的壓力
這里說下為什么有三個delay隊列,我這里其實是想根據業務劃分優先級,也是為了可以減少消息的延遲,將數據做了歸類,比如延遲1分鐘左右的數據,放在高優先級的隊列中, 10分鐘延遲的放在中優先級,2小時延遲的放在低優先級
具體的操作方式:每個delay隊列中的消息,不僅要存當前消息的內容,還要存下一個要消費的消息位置(offet),這樣就可以避免我們以 O(n)的復雜度去遍歷隊列,檢查要執行的隊列數據,另外考慮到有新增數據插隊的情況,需要在緩存中也維護一份當前最優先的offet值,方便我們做插隊處理
kafka中可以修改offet值,通過seek() 函數即可
這里還要考慮一個問題,就是當最優先的隊列數據還有1小時才要執行,那我們怎么處理,是sleep嗎?如果sleep太久的話,當程序代碼不能在max.poll.interval.ms配置的期望時間內處理這些消息的話,kafka就會認為這個消費者已經掛了,會進行rebalance,同時你這個消費者就無法再拉取到任何消息了,Kafka本身提供很優雅的解決方案,pause() 方法可以暫定消費,resume() 方法可以恢復消費,這樣就不會出現異常了
總結
以上是生活随笔為你收集整理的kafka 延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 3D游戏编程与设计作业6-Unity实现
- 下一篇: Day532533.Python基础 -