用redis实现消息队列
2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
為什么需要消息隊列
系統(tǒng)中引入消息隊列機制是對系統(tǒng)一個非常大的改善。例如一個web系統(tǒng)中,用戶做了某項操作后需要發(fā)送郵件通知到用戶郵箱中。你可以使用同步方式讓用戶等待郵件發(fā)送完成后反饋給用戶,但是這樣可能會因為網(wǎng)絡(luò)的不確定性造成用戶長時間的等待從而影響用戶體驗。
有些場景下是不可能使用同步方式等待完成的,那些需要后臺花費大量時間的操作。例如極端例子,一個在線編譯系統(tǒng)任務(wù),后臺編譯完成需要30分鐘。這種場景的設(shè)計不可能同步等待后在回饋,必須是先反饋用戶隨后異步處理完成,再等待處理完成后根據(jù)情況再此反饋用戶與否。
另外適用消息隊列的情況是那些系統(tǒng)處理能力有限的情況下,先使用隊列機制把任務(wù)暫時存放起來,系統(tǒng)再一個個輪流處理掉排隊的任務(wù)。這樣在系統(tǒng)吞吐量不足的情況下也能穩(wěn)定的處理掉高并發(fā)的任務(wù)。
消息隊列可以用來做排隊機制,只要系統(tǒng)需要用到排隊機制的地方就可以使用消息隊列來作。
使用redis怎么做消息隊列
首先redis它的設(shè)計是用來做緩存的,但是由于它自身的某種特性使得他可以用來做消息隊列。它有幾個阻塞式的API可以使用,正是這些阻塞式的API讓他有做消息隊列的能力。
redis能做消息隊列得益于他list對象blpop brpop接口以及Pub/Sub(發(fā)布/訂閱)的某些接口。他們都是阻塞版的,所以可以用來做消息隊列。
Redis實現(xiàn)先進(jìn)先出隊列
Redis實現(xiàn)FIFO很容易,只需要一個List對象從頭取數(shù)據(jù),從尾部塞數(shù)據(jù)即可實現(xiàn)。例如lpush存數(shù)據(jù),brpop取數(shù)據(jù)。
Redis實現(xiàn)優(yōu)先級隊列
首先brpop和blpop是支持多l(xiāng)ist讀取的,比如brpop lista listb 0 命令就可以實現(xiàn)先從lista讀取數(shù)據(jù),讀取完lista的數(shù)據(jù)再去讀取listb的數(shù)據(jù)。
那么我們就可以通過如下方式實現(xiàn)了:
127.0.0.1:6379>?lpush?a?1 (integer)?1 127.0.0.1:6379>?lpush?a?2 (integer)?2 127.0.0.1:6379>?lpush?a?3 (integer)?3 127.0.0.1:6379>?lpush?b?1 (integer)?1 127.0.0.1:6379>?lpush?b?2 (integer)?2 127.0.0.1:6379>?lpush?b?3 (integer)?3 127.0.0.1:6379>?brpop?a?b?0 1)?"a" 2)?"1" 127.0.0.1:6379>?brpop?a?b?0 1)?"a" 2)?"2" 127.0.0.1:6379>?brpop?a?b?0 1)?"a" 2)?"3" 127.0.0.1:6379>?brpop?a?b?0 1)?"b" 2)?"1" 127.0.0.1:6379>?brpop?a?b?0 1)?"b" 2)?"2" 127.0.0.1:6379>?brpop?a?b?0 1)?"b" 2)?"3" 127.0.0.1:6379>?brpop?a?b?0這種方案我們可以支持不同階段的優(yōu)先級隊列,例如高中低三個級別或者更多的級別都可以。
?
多優(yōu)先級問題解決
如果優(yōu)先級級別很多的情況,假設(shè)有個這樣的需求,優(yōu)先級不是簡單的高中低或者0-10這些固定的級別。而是類似0-99999這么多級別。那么我們第三種方案將不太合適了。
雖然redis有sorted set這樣的可以排序的數(shù)據(jù)類型,看是很可惜它沒有阻塞版的接口。于是我們還是只能使用list類型通過其他方式來完成目的。
?
有個簡單的做法我們可以只設(shè)置一個隊列,并保證它是按照優(yōu)先級排序號的。然后通過二分查找法查找一個任務(wù)合適的位置,并通過 lset 命令插入到相應(yīng)的位置。?
例如隊列里面包含著寫優(yōu)先級的任務(wù)[1, 3, 6, 8, 9, 14],當(dāng)有個優(yōu)先級為7的任務(wù)過來,我們通過自己的二分算法一個個從隊列里面取數(shù)據(jù)出來反和目標(biāo)數(shù)據(jù)比對,計算出相應(yīng)的位置然后插入到指定地點即可。
因為二分查找是比較快的,并且redis本身也都在內(nèi)存中,理論上速度是可以保證的。但是如果說數(shù)據(jù)量確實很大的話我們也可以通過一些方式來調(diào)優(yōu)。
把上面的方案結(jié)合起來就會很大程度上減少開銷。例如數(shù)據(jù)量十萬的隊列,它們的優(yōu)先級也是隨機0-十萬的區(qū)間。我們可以設(shè)置 10個或者100個不同的隊列,0-一萬的優(yōu)先級任務(wù)投放到1號隊列,一萬-二萬的任務(wù)投放到2號隊列。這樣將一個隊列按不同等級拆分后它單個隊列的數(shù)據(jù) 就減少許多,這樣二分查找匹配的效率也會高一點。但是數(shù)據(jù)所占的資源基本是不變的,十萬數(shù)據(jù)該占多少內(nèi)存還是多少。只是系統(tǒng)里面多了一些隊列而已。
redis實現(xiàn)定時消息隊列
由于Redis排序集合(Sorted Sets)沒有實現(xiàn)阻塞功能,所以只能通過程序自己實現(xiàn)。score字段存入時間戳,由于時間戳較長我們用三位數(shù)字代替。
127.0.0.1:6379>?zadd?seta?100?a (integer)?1 127.0.0.1:6379>?zadd?seta?200?b (integer)?1 127.0.0.1:6379>?zadd?seta?300?c (integer)?1 127.0.0.1:6379>?zadd?seta?300?d (integer)?1
首先我們插入4條數(shù)據(jù)。
然后我們獲取0到當(dāng)前時間的數(shù)據(jù)。比如當(dāng)前時間戳為200,那么我們執(zhí)行如下命令
127.0.0.1:6379>?zrangebyscore?seta?0?200?limit?0?1 1)?"a" 127.0.0.1:6379>?zrem?seta?a (integer)?1 127.0.0.1:6379>?zrangebyscore?seta?0?201?limit?0?1 1)?"b" 127.0.0.1:6379>?zrem?seta?b (integer)?1 127.0.0.1:6379>?zrangebyscore?seta?0?202?limit?0?1 (empty?list?or?set)
如果取到空數(shù)據(jù),阻塞一段時間,然后繼續(xù)取數(shù)據(jù),循環(huán)執(zhí)行即可。
這里我們?yōu)槭裁礇]有采用zremrangebyscore命令而是采用zrangebyscore和zrem組合,因為zremrangebyscore沒有l(wèi)imit參數(shù),可能取到多行數(shù)據(jù)(例如兩個數(shù)據(jù)socore一樣等),由于并發(fā)問題可能導(dǎo)致zrem返回0,這樣也沒事,我們繼續(xù)取即可。
java代碼片段:
public?String?getData()?throws?Exception?{Jedis?jedis?=?getResource();while?(true)?{Set<String>?seta?=?jedis.zrangeByScore("seta",?0,?System.currentTimeMillis(),?0,?1);if?(seta?!=?null?&&?seta.size()?>?0)?{String?data?=?seta.toArray(new?String[]?{})[0];Long?res?=?jedis.zrem("seta",?data);if?(res?>?0)?{return?data;}}Thread.sleep(1000L);} }
來自個人博客:http://www.jflyfox.com/mtg/front/article/997.html
轉(zhuǎn)載于:https://my.oschina.net/flyoffox/blog/527860
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的用redis实现消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 四层负载均衡和七层负载均衡的区别
- 下一篇: BIEE连接数据库的方法