借助Redis完成延时任务
背景
相信我們或多或少的會(huì)遇到類似下面這樣的需求:
第三方給了一批數(shù)據(jù)給我們處理,我們處理好之后就通知他們處理結(jié)果。
大概就是下面這個(gè)圖說的。
本來在處理完數(shù)據(jù)之后,我們就會(huì)馬上把處理結(jié)果返回給對(duì)方,但是對(duì)方要求我們處理速度不能過快,要有一種人為處理的效果。
換句話就是說,就算是處理好了,也要晚一點(diǎn)再執(zhí)行通知操作。
這就是一個(gè)典型的延時(shí)任務(wù)。
延時(shí),那還不簡單,執(zhí)行完之后,讓它Sleep一下就好了,這樣就達(dá)到目標(biāo)了。
Sleep一下確定是最容易實(shí)現(xiàn)的一種方案,但是試想一下,數(shù)據(jù)的數(shù)量不斷的增加,這樣Sleep真的好嗎?答案是否定的。
延時(shí)隊(duì)列,是處理這個(gè)場(chǎng)景最為妥當(dāng)?shù)姆桨浮?/p>
RabbitMQ,RocketMQ,Cmq等都可以直接或間接的達(dá)到相應(yīng)的效果。
如果不具備隊(duì)列條件,又要怎么處理呢?還可以借助Redis來完成這項(xiàng)工作。
MQ不一定每個(gè)公司都會(huì)用,但Redis應(yīng)該80%以上的都會(huì)用吧。
處理方案
Redis這邊,可用的方案有兩種,下面分別來介紹一下。
#1 鍵的過期時(shí)間
在設(shè)置緩存的時(shí)候,我們比較多情況下都會(huì)設(shè)置一個(gè)緩存的過期時(shí)間,這個(gè)時(shí)間過期后,會(huì)重新去數(shù)據(jù)源拿數(shù)據(jù)回來。
可以基于這個(gè)過期時(shí)間結(jié)合Redis的keyspace notifications共同完成。
keyspace notifications里面包含了非常多的事件,這里只關(guān)注EXPIRE,這個(gè)是和過期有關(guān)的。
只要訂閱了__keyevent@0__:expired這個(gè)主題,當(dāng)有key過期的時(shí)候,就會(huì)收到對(duì)應(yīng)的信息。
主題@后面的0,指的是db 0.
要想使用這個(gè)特性,必不可少的一步是修改Redis默認(rèn)的配置,把notify-keyspace-events設(shè)置成Ex。
############################# Event notification ############################### Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # ......... # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. notify-keyspace-events "Ex"其中 E 指的是鍵事件通知,x 指的是過期事件。
根據(jù)這個(gè)特性,重新調(diào)整一下流程圖:
應(yīng)該也比較好懂,下面通過簡單的代碼來實(shí)現(xiàn)一下這種方案。
首先是處理完數(shù)據(jù)及往Redis寫數(shù)據(jù)。
public async Task DoTaskAsync() {// 數(shù)據(jù)處理// ...// 后續(xù)操作要延時(shí),把Id記錄下來var taskId = new Random().Next(1, 10000);// 要延遲的時(shí)間int sec = new Random().Next(1, 5);// 可以加個(gè)重試機(jī)制,預(yù)防單次執(zhí)行失敗。await RedisHelper.SetAsync($"task:{taskId}", "1", sec); }還需要回傳結(jié)果的后臺(tái)任務(wù),這個(gè)任務(wù)就是去訂閱上面說的鍵過期事件,然后回傳結(jié)果。
這里可以借助BackgroundService來訂閱處理。
public class SubscribeTaskBgTask : BackgroundService {protected override Task ExecuteAsync(CancellationToken stoppingToken){stoppingToken.ThrowIfCancellationRequested();var keyPrefix = "task:";RedisHelper.Subscribe(("__keyevent@0__:expired", arg =>{var msg = arg.Body;Console.WriteLine($"recive {msg}");if (msg.StartsWith(keyPrefix)){// 取到任務(wù)Idvar val = msg.Substring(keyPrefix.Length);Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");// 回傳處理結(jié)果給第三方,這里可以考慮這個(gè)并發(fā)鎖,避免多實(shí)例都處理了這個(gè)任務(wù)。// ....}}));return Task.CompletedTask;} }這里有一個(gè)要注意的地方,要在key里面包含任務(wù)的Id,因?yàn)橛嗛喬幚淼臅r(shí)候,只能拿到一個(gè)key,后續(xù)能做的操作也只是基于這個(gè)key。
上面的例子,是用了task:任務(wù)Id的形式,所以在訂閱處理的時(shí)候,只處理以task:開頭的那些key。
效果如下:
這種方案,直觀上是非常簡單的,不過這種方案會(huì)遇到一個(gè)小問題。
當(dāng)一個(gè)key過期后,并不一定會(huì)馬上收到通知,這個(gè)也是會(huì)有一定的延時(shí)的,取決于Redis的內(nèi)部機(jī)制。
Redis Keyspace Notifications文檔的最后一段也提到了這個(gè)問題。
所以用這種方案的時(shí)候,要考慮一下,你的延時(shí)是不是要及時(shí)~~
#2 有序集合
有序集合是Redis中一種十分有用的數(shù)據(jù)結(jié)構(gòu),它的本質(zhì)其實(shí)就是集合加了一個(gè)排序的功能,每個(gè)集合里面的元素還會(huì)有一個(gè)分值的屬性。
它提供了一個(gè)可以獲取指定分值范圍內(nèi)的元素,這個(gè)也就是我們的出發(fā)點(diǎn)。
在這個(gè)場(chǎng)景下,什么東西可能作為這個(gè)分值呢?現(xiàn)在只有一個(gè)處理任務(wù)的Id還有一個(gè)延遲的時(shí)間,Id肯定不行,那么也只能是延遲時(shí)間來作這個(gè)分值了。
延遲1秒,5秒,1分鐘,這個(gè)都是比較大粒度的時(shí)間,這里要轉(zhuǎn)化一下,用時(shí)間戳來代替這些延遲的時(shí)間。
假設(shè)現(xiàn)在的時(shí)間戳是 1584171520, 要延遲5秒執(zhí)行,那么執(zhí)行任務(wù)的時(shí)間就是 1584171525,在當(dāng)前時(shí)間戳的基礎(chǔ)上加個(gè)5秒,就是最終要執(zhí)行的了。
到時(shí)有序集合中存的元素就會(huì)是這樣的
任務(wù)Id-1 1584171525 任務(wù)Id-2 1584171528 任務(wù)Id-3 1584171530接下來就是要怎么取出這些任務(wù)的問題了!
把當(dāng)前時(shí)間戳當(dāng)成是取數(shù)的最大分值,0作為最小分值,這個(gè)時(shí)候取出的元素就是應(yīng)該要執(zhí)行回傳的任務(wù)了。
根據(jù)這個(gè)方案,重新調(diào)整一下流程圖:
交代清楚了思路,再來點(diǎn)代碼,加深一下理解。
首先還是處理完數(shù)據(jù)后往Redis寫數(shù)據(jù)。
public async Task DoTaskAsync() {// 數(shù)據(jù)處理// ...// 后續(xù)操作要延時(shí),把Id記錄下來var taskId = new Random().Next(1, 10000);var cacheKey = "task:delay";int sec = new Random().Next(1, 5);// 要執(zhí)行這個(gè)任務(wù)的時(shí)間戳var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();await RedisHelper.ZAddAsync(cacheKey, (time, taskId));Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}"); }后面就是輪訓(xùn)有序集合里面的元素了,這里同樣是借助BackgroundService來處理。
public class SubscribeTaskBgTask : BackgroundService {protected override async Task ExecuteAsync(CancellationToken stoppingToken){stoppingToken.ThrowIfCancellationRequested();var cacheKey = "task:delay";while (true){// 先取,后刪,不具備原子性,可考慮用lua腳本來保證原子性。var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);if (vals != null && vals.Length > 0){var val = vals[0];var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);if (rmCount > 0){// 要把這個(gè)元素先刪除成功了,再執(zhí)行任務(wù),不然會(huì)重復(fù)Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");// 回傳處理結(jié)果給第三方,這里可以考慮這個(gè)并發(fā)鎖,避免多實(shí)例都處理了這個(gè)任務(wù)。// ....}}else{// 沒有數(shù)據(jù),休眠500ms,避免CPU空轉(zhuǎn)await Task.Delay(500);}}} }效果如下:
參考文章
https://redis.io/topics/notifications
https://zhuanlan.zhihu.com/p/87113913
總結(jié)
以上是生活随笔為你收集整理的借助Redis完成延时任务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [半翻] 设计面向DDD的微服务
- 下一篇: 多亏我缓存技术过硬!疫情防控项目上线,我