延时消息_手把手实现一条延时消息
前言
近期在維護(hù)公司的調(diào)度平臺(tái),其中有個(gè)關(guān)鍵功能那就是定時(shí)任務(wù);定時(shí)任務(wù)大家平時(shí)肯定接觸的不少,比如 JDK 中的 Timer、ScheduledExecutorService、調(diào)度框架 Quartz 等。
通常用于實(shí)現(xiàn) XX 時(shí)間后的延時(shí)任務(wù),或周期性任務(wù);
比如一個(gè)常見(jiàn)的業(yè)務(wù)場(chǎng)景:用戶(hù)下單 N 分鐘未能支付便自動(dòng)取消訂單。
實(shí)現(xiàn)這類(lèi)需求通常有兩種方式:
- 輪詢(xún)定時(shí)任務(wù):給定周期內(nèi)掃描所有未支付的訂單,查看時(shí)間是否到期。
- 延時(shí)消息:訂單創(chuàng)建的時(shí)候發(fā)送一條 N 分鐘到期的信息,一旦消息消費(fèi)后便可判斷訂單是否可以取消。
先看第一種,這類(lèi)方式實(shí)現(xiàn)較為簡(jiǎn)單,只需要啟動(dòng)一個(gè)定時(shí)任務(wù)即可;但缺點(diǎn)同樣也很明顯,這個(gè)間隔掃描的時(shí)間不好控制。
給短了會(huì)造成很多無(wú)意義的掃描,增大數(shù)據(jù)庫(kù)壓力,給長(zhǎng)了又會(huì)使得誤差較大。
當(dāng)然最大的問(wèn)題還是效率較低,隨著訂單增多耗時(shí)會(huì)呈線(xiàn)性增長(zhǎng),最差的情況甚至?xí)霈F(xiàn)上一波輪詢(xún)還沒(méi)有掃描完,下一波調(diào)度又來(lái)了。
這時(shí)第二種方案就要顯得靠譜多了,通過(guò)延時(shí)消息可以去掉不必要的訂單掃描,實(shí)時(shí)性也比較高。
延時(shí)消息
這里我們不過(guò)多討論這類(lèi)需求如何實(shí)現(xiàn);重點(diǎn)聊聊這個(gè)延時(shí)消息,看它是如何實(shí)現(xiàn)的,基于實(shí)現(xiàn)延時(shí)消息的數(shù)據(jù)結(jié)構(gòu)還能實(shí)現(xiàn)定時(shí)任務(wù)。
我在之前的開(kāi)源 IM 項(xiàng)目中也加入了此類(lèi)功能,可以很直觀(guān)的發(fā)送一條延時(shí)消息,效果如下:
使用 :delay hahah 2 發(fā)送了一條兩秒鐘的延時(shí)消息,另外一個(gè)客戶(hù)端將會(huì)在兩秒鐘之后收到該消息。
具體的實(shí)現(xiàn)步驟會(huì)在后文繼續(xù)分析。
時(shí)間輪
要實(shí)現(xiàn)延時(shí)消息就不得不提到一種數(shù)據(jù)結(jié)構(gòu)【時(shí)間輪】,時(shí)間輪聽(tīng)這名字可以很直觀(guān)的抽象出它的數(shù)據(jù)結(jié)構(gòu)。
其實(shí)本質(zhì)上它就是一個(gè)環(huán)形的數(shù)組,如圖所示,假設(shè)我們創(chuàng)建了一個(gè)長(zhǎng)度為 8 的時(shí)間輪。
task0 = 當(dāng)我們需要新建一個(gè) 5s 延時(shí)消息,則只需要將它放到下標(biāo)為 5 的那個(gè)槽中。
task1 = 而如果是一個(gè) 10s 的延時(shí)消息,則需要將它放到下標(biāo)為 2 的槽中,但同時(shí)需要記錄它所對(duì)應(yīng)的圈數(shù),不然就和 2 秒的延時(shí)消息重復(fù)了。
task2= 當(dāng)創(chuàng)建一個(gè) 21s 的延時(shí)消息時(shí),它所在的位置就和 task0 相同了,都在下標(biāo)為 5 的槽中,所以為了區(qū)別需要為他加上圈數(shù)為 2。
通過(guò)這張圖可以更直觀(guān)的理解。
當(dāng)我們需要取出延時(shí)消息時(shí),只需要每秒往下移動(dòng)這個(gè)指針,然后取出該位置的所有任務(wù)即可。
當(dāng)然取出任務(wù)之前還得判斷圈數(shù)是否為 0 ,不為 0 時(shí)說(shuō)明該任務(wù)還得再輪幾圈,同時(shí)需要將圈數(shù) -1 。
這樣就可避免輪詢(xún)所有的任務(wù),不過(guò)如果時(shí)間輪的槽比較少,導(dǎo)致某一個(gè)槽上的任務(wù)非常多那效率也比較低,這就和 HashMap 的 hash 沖突是一樣的。
編碼實(shí)現(xiàn)
理論講完后我們來(lái)看看實(shí)際的編碼實(shí)現(xiàn),為此我創(chuàng)建了一個(gè) RingBufferWheel 類(lèi)。
它的主要功能如下:
- 可以添加指定時(shí)間的延時(shí)任務(wù),在這個(gè)任務(wù)中可以實(shí)現(xiàn)自己的業(yè)務(wù)邏輯。
- 停止運(yùn)行(包含強(qiáng)制停止和所有任務(wù)完成后停止)。
- 查看待執(zhí)行任務(wù)數(shù)量。
首先直接看看這個(gè)類(lèi)是如何使用的。
我在這里創(chuàng)建了 65 個(gè)延時(shí)任務(wù),每個(gè)任務(wù)都比前一個(gè)延后 1s 執(zhí)行;同時(shí)自定義了一個(gè) Job 類(lèi)來(lái)實(shí)現(xiàn)自己的業(yè)務(wù)邏輯,最后調(diào)用 stop(false) 會(huì)在所有任務(wù)執(zhí)行完畢后退出。
構(gòu)造函數(shù)
先來(lái)看看其中的構(gòu)造函數(shù),這里一共有兩個(gè)構(gòu)造函數(shù),用于接收一個(gè)線(xiàn)程池及時(shí)間輪的大小。
線(xiàn)程池的作用會(huì)在后面講到。
這里的時(shí)間輪大小也是有講究的,它的長(zhǎng)度必須得是 2∧n,至于為什么有這個(gè)要求后面也會(huì)講到。
默認(rèn)情況下會(huì)初始化一個(gè)長(zhǎng)度為 64 的數(shù)組。
添加任務(wù)
下面來(lái)看看添加任務(wù)的邏輯,根據(jù)我們之前的那張抽象圖其實(shí)很容易實(shí)現(xiàn)。
首先我們要定義一個(gè) Task 類(lèi),用于抽象任務(wù);它本身也是一個(gè)線(xiàn)程,一旦延時(shí)到期便會(huì)執(zhí)行其中的 run 函數(shù),所以使用時(shí)便可繼承該類(lèi),將業(yè)務(wù)邏輯寫(xiě)在 run() 中即可。
它其中還有兩個(gè)成員變量,也很好理解。
- cycleNum 用于記錄該任務(wù)所在時(shí)間輪的圈數(shù)。
- key 在這里其實(shí)就是延時(shí)時(shí)間。
//通過(guò) key 計(jì)算應(yīng)該存放的位置 private Set get(int key) { int index = mod(key, bufferSize); return (Set) ringBuffer[index]; } private int mod(int target, int mod) { // equals target % mod target = target + tick.get() ; return target & (mod - 1); }
首先是根據(jù)延時(shí)時(shí)間 (key) 計(jì)算出所在的位置,其實(shí)就和 HashMap 一樣的取模運(yùn)算,只不過(guò)這里使用了位運(yùn)算替代了取模,同時(shí)效率會(huì)高上不少。
這樣也解釋了為什么數(shù)組長(zhǎng)度一定得是 2∧n。
然后查看該位置上是否存在任務(wù),不存在就新建一個(gè);存在自然就是將任務(wù)寫(xiě)入這個(gè)集合并更新回去。
private int cycleNum(int target, int mod) { //equals target/mod return target >> Integer.bitCount(mod - 1); }其中的 cycleNum() 自然是用于計(jì)算該任務(wù)所處的圈數(shù),也是考慮到效率問(wèn)題,使用位運(yùn)算替代了除法。
private void put(int key, Set tasks) { int index = mod(key, bufferSize); ringBuffer[index] = tasks; }而 put() 函數(shù)就非常簡(jiǎn)單了,就是將任務(wù)寫(xiě)入指定數(shù)組下標(biāo)即可。
啟動(dòng)時(shí)間輪
任務(wù)寫(xiě)進(jìn)去后下一步便是啟動(dòng)這個(gè)時(shí)間輪了,我這里定義了一個(gè) start() 函數(shù)。
其實(shí)本質(zhì)上就是開(kāi)啟了一個(gè)后臺(tái)線(xiàn)程來(lái)做這個(gè)事情:
它會(huì)一直從時(shí)間輪中取出任務(wù)來(lái)運(yùn)行,而運(yùn)行這些任務(wù)的線(xiàn)程便是我們?cè)诔跏蓟瘯r(shí)傳入的線(xiàn)程池;所以所有的延時(shí)任務(wù)都是由自定義的線(xiàn)程池調(diào)度完成的,這樣可以避免時(shí)間輪的阻塞。
這里調(diào)用的 remove(index) 很容易猜到是用于獲取當(dāng)前數(shù)組中的所有任務(wù)。
邏輯很簡(jiǎn)單就不再贅述,不過(guò)其中的 size2Notify() 倒是值得說(shuō)一下。
他是用于在停止任務(wù)時(shí),主線(xiàn)程等待所有延時(shí)任務(wù)執(zhí)行完畢的喚醒條件。這類(lèi)用法幾乎是所有線(xiàn)程間通信的常規(guī)套路,值得收入技能包。
停止時(shí)間輪
剛才提到的喚醒主線(xiàn)程得配合這里的停止方法使用:
如果是強(qiáng)制停止那便什么也不管,直接更新停止標(biāo)志,同時(shí)關(guān)閉線(xiàn)程池即可。
但如果是軟停止(等待所有任務(wù)執(zhí)行完畢)時(shí),那就得通過(guò)上文提到的方式阻塞主線(xiàn)程,直到任務(wù)執(zhí)行完畢后被喚醒。
CIM 中的應(yīng)用
介紹了核心原理和基本 API 后,我們來(lái)看看實(shí)際業(yè)務(wù)場(chǎng)景如何結(jié)合使用(背景是一個(gè)即時(shí)通訊項(xiàng)目)。
我這里所使用的場(chǎng)景在文初也提到了,就是真的發(fā)送一條延時(shí)消息;
現(xiàn)有的消息都是實(shí)時(shí)消息,所以要實(shí)現(xiàn)一個(gè)延時(shí)消息便是在現(xiàn)有的發(fā)送客戶(hù)端處將延時(shí)消息放入到這個(gè)時(shí)間輪中,在任務(wù)到期時(shí)再執(zhí)行真正的消息發(fā)送邏輯。
由于項(xiàng)目本身結(jié)合了 Spring,所以第一步自然是配置 bean。
bean 配置好后其實(shí)就可以使用了。
每當(dāng)發(fā)送的是延時(shí)消息時(shí),只需要將這個(gè)消息封裝為一個(gè) Job 放到時(shí)間輪中,然后在自己的業(yè)務(wù)類(lèi)中完成業(yè)務(wù)即可。
后續(xù)可以?xún)?yōu)化下 api,不用每次新增任務(wù)都要調(diào)用 start() 方法。
這樣一個(gè)延時(shí)消息的應(yīng)用便完成了。
總結(jié)
時(shí)間輪這樣的應(yīng)用還非常多,比如 Netty 中的 HashedWheelTimer 工具原理也差不多,可以用于維護(hù)長(zhǎng)連接心跳信息。
甚至 Kafka 在這基礎(chǔ)上還優(yōu)化出了層級(jí)時(shí)間輪,這些都是后話(huà)了,大家感興趣的話(huà)可以自行搜索資料或者抽時(shí)間我再完善一次。
這篇文章從前期準(zhǔn)備到擼碼實(shí)現(xiàn)還是花了不少時(shí)間,如果對(duì)你有幫助的話(huà)還請(qǐng)點(diǎn)贊轉(zhuǎn)發(fā)。
本文的所有源碼都可在此處查閱:
https://github.com/crossoverJie/cim
你的點(diǎn)贊與分享是對(duì)我最大的支持
原文:https://my.oschina.net/crossoverjie/blog/3111640
總結(jié)
以上是生活随笔為你收集整理的延时消息_手把手实现一条延时消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: apache camel 相关配置_My
- 下一篇: C++实现中值滤波算法