Redis实现消息队列之生产消费模式
簡單的介紹下消息隊列,使用消息隊列首先我們得有一個隊列,那么這個隊列之前講過就是先進先出的一個數據結構;那么有了隊列以后我們還需要有人在隊列里面放東西,那么這個放東西的人我們稱之為生產者;有了生產者對應的需要一個消費者,沒有消費者這個隊列滿了就會溢出。
簡單隊列實現
那么我們Redis剛好有一個數據類型符合這個就是List。list可以實現隊列(先進先出)和棧(先進后出),那么這個list又有兩種插入數據的方式:頭插法和尾插法。所以我們今天使用的結構是隊列,使用尾插法,關鍵的命令有rpush(尾插)和lpop(頭部獲取)。如下圖所示:
>?rpush?squeue?zhangsan?lisi?mango????#插入隊列(integer) 3>?lpop?squeue????#獲取隊列"zhangsan">?rpush?squeue?wangwu(integer) 3> lpop squeue"lisi"> lpop squeue"mango"> lpop squeue"wangwu">?lpop?squeue????#空隊列(nil)上面是rpush/lpop結合使用的例子。還可以使用lpush/rpop結合使用,效果是一樣的。
注意:我們這里思考一個問題,在咱們實際開發中我們獲取隊列數據的時候如果這個隊列里面沒有任何值了,我們會一直pop,這樣我們的程序出現了一個死循環,而且此時的redis會不斷的處理服務器的pop指令使之內存增高。
此時mango靈光一閃,每次pop的時候判斷,如果隊列有值我們獲取這個值進行操作,如果隊列是空隊列,那么我們此時休息三秒鐘再請求,emmm,加雞腿。
>?lpop?squeue????#空隊列(nil).......sleep?3s?old> lpop squeue...隊列阻塞
通過后端程序控制redis服務器休息時間是一個好辦法,但是此處有一個問題,如果說服務器在休眠的時候隊列突然進來一個值,而此時需要及時反應獲取這個值該如何實現呢?
當然,redis早就考慮到這個問題,so提供了一個叫做隊列阻塞讀,其命令blpop和brpop,就是lpop和rpop的阻塞讀方法
blpop [第一個參數:key]... [第二個參數:time]key可以有多個key等待time就是阻塞時間,單位默認為秒嗯,看似完美的解決了上面的方法
問題又來了,如果說此處設置的時間稍微長一點,阻塞請求的客戶端連接再多一點那么會出現下一個問題,那就是空閑連接問題。如果線程一直阻塞在哪里,Redis的客戶端連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用。這個時候blpop/brpop會拋出異常來。
所以,魚與熊掌不可兼得,開發者當注意此處需要捕獲異常,然后重新請求。
延遲隊列
上面我們介紹了阻塞隊列,深知空閑連接會被回收出現異常問題,那么我們可不可以實現延遲隊列,我提前一段時間比如5秒獲取隊列中的元素,當元素記錄的時間到達了我再去執行這個值,然后又提前5秒時間去獲取隊列中的值,依次反復。即可保證我在某一時刻去執行隊列中對應時間的值。
我們來分析,首先保證隊列是一個有序的我們才能依次執行,這里我們使用ZSet因為它帶有排序且不重復,保證客戶端沒有提交重復數據,那么值保證了,這個排序如何設計呢?我們不能使用'yyyyMMddHHmmssSSS'這種,第一個不適應這個排序類型可能會超出,第二就是每次轉換會帶來運算轉換的消耗,所有這里我們使用時間戳。那么zset提供一個獲取某段存在數據指令zrangebyscore,這個指令能夠獲取到我們想要的數據,然后通過zrem刪除zset里面的值即可完成消費。
####假設當前時間戳是0>?zadd?dqueue?4?zhangsan?8?lisi?12?mango????#添加元素(integer) 3####提前獲取后5秒的數據>?zrangebyscore?dqueue?-inf?5?withscores????#提前獲取5秒內的數據1)?"zhangsan"????#值2)?4.0???????????#當前排序索引>?zrem?dqueue?zhangsan?????#消費后刪除元素1####當前時間戳來到了5秒,提前5秒獲取就加上這個>?zrangebyscore?dqueue?-inf?10?withscores????#獲取10以內的數據,輪訓調用1) "lisi"2) 8.0寫在最后:
我們首先實現簡單的生成消費模式,針對這種簡單輪詢我們通過有數據和無數據讓這個輪詢實現休息策略來優化,需求更改需要及時響應生產者的數據我們使用了阻塞隊列來減少響應延時,通過分析我們又發現阻塞隊列會被回收的問題,我們又雙叒叕進行了優化,通過zset來實現延遲隊列。
你以為這樣就萬無一失了,其實還存在一個問題那就是不能保證原子性,我們的zrangebyscore和zrem不能在同一原子內執行,這里只是針對的一個客戶端,如果有多個客戶端執行那就會出現多次消費問題,也就是資源爭搶,這里我們可以使用lua腳本來執行保證原子性,即獲取后刪除,其值和時間戳保存到內存中,通過后端程序控制時間消費。當然,問題還會有的,我們在下一篇來講解Redis消息隊列的發布訂閱模式。
?
一名正在搶救的coder
筆名:mangolove
CSDN地址:https://blog.csdn.net/mango_love
GitHub地址:https://github.com/mangoloveYu
總結
以上是生活随笔為你收集整理的Redis实现消息队列之生产消费模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux内核编译与管理
- 下一篇: 《C++游戏开发》十八 角色在障碍物中智