redis之如何实现消息队列
寫在前面
本文一起來看下使用redis如何實(shí)現(xiàn)消息隊(duì)列的功能。目前在redis想要實(shí)現(xiàn)消息隊(duì)列的功能有如下的兩種方案:
1:基于List的lpush和rpop 2:Streams這里不將pub/sub考慮在內(nèi),因?yàn)槠洳痪邆涑志没哪芰?#xff0c;消息會(huì)丟失。
其中1是利用其有的先進(jìn)先出特性實(shí)現(xiàn),2是redis為了實(shí)現(xiàn)消息隊(duì)列專門在redis5版本中定義的一種新的數(shù)據(jù)結(jié)構(gòu),這里注意,其也是一種數(shù)據(jù)結(jié)構(gòu),和String,Set等處于同等位置的數(shù)據(jù)結(jié)構(gòu),只不過內(nèi)部增加了一些針對(duì)消息隊(duì)列的一些特有操作來實(shí)現(xiàn)消息隊(duì)列的功能,后續(xù)我們會(huì)詳細(xì)分析其用法。
1:消息隊(duì)列需要滿足哪些要求
- 消息保序
即要保證消息發(fā)送的順序和消費(fèi)的順序是一致的,不一致的話可能會(huì)導(dǎo)致業(yè)務(wù)上的錯(cuò)誤。 - 重復(fù)消息處理
消息的重復(fù)處理,準(zhǔn)確來說是應(yīng)該通過消費(fèi)者自身來實(shí)現(xiàn)的,在應(yīng)用程序內(nèi)部實(shí)現(xiàn)這些邏輯,但是對(duì)于消息中間件來說也要有此類機(jī)制,即對(duì)于一個(gè)已經(jīng)被消費(fèi)的消息(已經(jīng)收到ACK)不能再次被消費(fèi)。 - 消息可靠性
換句話說,要具有持久化的能力,避免消息丟失,這樣當(dāng)消費(fèi)者異常宕機(jī)導(dǎo)致再次重啟后需要重新消費(fèi)消息時(shí)可以再次獲取。
接下來我們先來看下使用List如何實(shí)現(xiàn)消息隊(duì)列。
2:List實(shí)現(xiàn)消息隊(duì)列
生產(chǎn)消息我們可以使用lpush,消費(fèi)消息可以使用rpop,如下圖生產(chǎn)和消費(fèi)消息的過程:
但是注意,這里我們使用rpop消費(fèi)消息時(shí),如果沒有消息則會(huì)直接返回, 且有新消息時(shí)也不會(huì)通知,此時(shí)就需要通過諸如while(true),for(;;)之類的死循環(huán)來不斷查詢,這樣就會(huì)不斷的消耗CPU資源,造成不必要的CPU資源浪費(fèi),為了解決這個(gè)問題,redis提供了rpop阻塞版本命令brpop,其中bblock就代表了阻塞的特性,當(dāng)有消息可以消費(fèi)時(shí),這樣就解決了浪費(fèi)CPU資源的問題。
到這里,對(duì)于消息中間件的3個(gè)需求,List都滿足哪些呢?首先第一個(gè)消息保序,是天然支持的,對(duì)于重復(fù)消息處理List本身是不支持的,但是我們?cè)谇懊嬉卜治隽?#xff0c;這準(zhǔn)確來說更應(yīng)該是消費(fèi)者本身來實(shí)現(xiàn)的,因此,生產(chǎn)者在生產(chǎn)消息時(shí)只要給每一個(gè)消息一個(gè)唯一的標(biāo)識(shí),然后消費(fèi)者通過此來避免消息重復(fù)處理就可以了,比如LPUSH mq "101030001:stock:5",其中的101030001就是其唯一標(biāo)識(shí)。最后是消息可靠性,為了滿足消息可靠性,redis進(jìn)一步對(duì)rpop進(jìn)行了增強(qiáng),提供了BRPOPLPUSH命令,消息彈出后,會(huì)重新壓到一個(gè)新的隊(duì)列中,這樣當(dāng)消費(fèi)者異常重啟后就可以從這里重新消費(fèi)消息了,如下圖:
到這里,List在一定程序上都滿足了消息中間件需要滿足的3個(gè)條件,這里還需要考慮另外一種情況,即當(dāng)消費(fèi)者的消費(fèi)能力嚴(yán)重不足,或者是生產(chǎn)者生產(chǎn)的消息量非常大,即生產(chǎn)能力遠(yuǎn)大于消費(fèi)能力的時(shí)候List就沒有什么辦法了,因?yàn)槠渲荒芤粋€(gè)一個(gè)的彈出消息,對(duì)于這個(gè)問題就需要依靠redis專門針對(duì)消息隊(duì)列的場(chǎng)景實(shí)現(xiàn)的新數(shù)據(jù)結(jié)構(gòu)Streams,注意這也是一種新的數(shù)據(jù)結(jié)構(gòu)。
3:Streams
我們前面說了,Streams是一種redis專門為消息隊(duì)列定義的一種數(shù)據(jù)結(jié)構(gòu),所以自然的我們是先要看如何定義這種數(shù)據(jù)結(jié)構(gòu)了,和其它的數(shù)據(jù)結(jié)構(gòu)一樣,我們不需要顯式的創(chuàng)建,在執(zhí)行第一次數(shù)據(jù)添加的時(shí)候自動(dòng)創(chuàng)建,添加數(shù)據(jù)的命令是XADD,語法格式是XADD key ID field value [field value ...],參數(shù)說明如下:
key:redis的key ID:消息的唯一標(biāo)識(shí),可以指定,也可以設(shè)置為*,設(shè)置為*時(shí)id會(huì)自動(dòng)生成,id是遞增 field value:消息的字段和值如下生產(chǎn)(創(chuàng)建)若干條消息:
redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0"2) 1) "name"2) "Sara"3) "surname"4) "OConnor" 2) 1) "1601372323627-1"2) 1) "field1"2) "value1"3) "field2"4) "value2"5) "field3"6) "value3" redis>其中XLEN用來查看消息的個(gè)數(shù),XRANGE用來通過范圍查詢基于遞增ID獲取消息,-相當(dāng)于是負(fù)無窮,+相當(dāng)于是正無窮,即獲取所有消息。我們接著再來看下其它一些命令。
3.1:XDEL
根據(jù)ID刪除消息,如下測(cè)試:
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-02) 1) "a"2) "1" 2) 1) 1538561701744-02) 1) "c"2) "3"3.2:XLEN
獲取消息的數(shù)量,語法格式xlen key,如下:
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>3.3:XRANGE
查詢指定范圍的消息,語法格式XRANGE key start end [COUNT count],解釋如下:
key :隊(duì)列名 start :開始值, - 表示最小值 end :結(jié)束值, + 表示最大值 count :數(shù)量測(cè)試如下:
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0"2) 1) "name"2) "Virginia"3) "surname"4) "Woolf" 2) 1) "1601372577811-1"2) 1) "name"2) "Jane"3) "surname"4) "Austen" redis>3.4:XREVRANGE
從后往前獲取消息,語法格式XREVRANGE key end start [COUNT count],解釋如下:
key :隊(duì)列名 end :結(jié)束值, + 表示最大值 start :開始值, - 表示最小值 count :數(shù)量實(shí)例如下:
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) "1601372731459-3"2) 1) "name"2) "Ngozi"3) "surname"4) "Adichie"3.5:XREAD
以阻塞或者是非阻塞的方式獲取消息,即消費(fèi)消息的命令,語法格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],解釋如下:
count :數(shù)量 milliseconds :可選,阻塞毫秒數(shù),沒有設(shè)置就是非阻塞模式 key :隊(duì)列名 id :消息 ID測(cè)試如下:
# 從 Stream 頭部讀取兩條消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream"2) 1) 1) 1526984818136-02) 1) "duration"2) "1532"3) "event-id"4) "5"5) "user-id"6) "7782813"2) 1) 1526999352406-02) 1) "duration"2) "812"3) "event-id"4) "9"5) "user-id"6) "388234" 2) 1) "writers"2) 1) 1) 1526985676425-02) 1) "name"2) "Virginia"3) "surname"4) "Woolf"2) 1) 1526985685298-02) 1) "name"2) "Jane"3) "surname"4) "Austen"3.6:XGROUP CREATE
創(chuàng)建消費(fèi)者組,使用消費(fèi)者可以對(duì)消息進(jìn)行并發(fā)的消費(fèi),解決消費(fèi)者消費(fèi)能力不足的問題,語法格式為XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername],解釋如下:
key :隊(duì)列名稱,如果不存在就創(chuàng)建 groupname :組名。 $ : 表示從尾部開始消費(fèi),只接受新消息,當(dāng)前 Stream 消息會(huì)全部忽略。如下從頭開始消費(fèi):
XGROUP CREATE mystream consumer-group-name 0-0如下從尾部開始消費(fèi):
XGROUP CREATE mystream consumer-group-name $在實(shí)際的場(chǎng)景中我們可以通過設(shè)置多個(gè)消費(fèi)者組的不同開始消費(fèi)的位置來實(shí)現(xiàn)并發(fā)消費(fèi)的效果,此時(shí)可能如下圖:
圖中主要元素解釋如下:
每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。 Consumer Group :消費(fèi)組,使用 XGROUP CREATE 命令創(chuàng)建,一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer)。 last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。 pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認(rèn)字符)。3.7:XREADGROUP GROUP
讀取消費(fèi)者組中的消息,語法格式如下:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]解釋如下:
group :消費(fèi)組名 consumer :消費(fèi)者名。 count : 讀取數(shù)量。 milliseconds : 阻塞毫秒數(shù)。 key : 隊(duì)列名。 ID : 消息 ID。如下測(cè)試:
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >符號(hào)>標(biāo)識(shí)從第一條尚未被消費(fèi)的消息開始消費(fèi)。
寫在后面
參考文章列表:
Redis Stream 。
總結(jié)
以上是生活随笔為你收集整理的redis之如何实现消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 4.5 制作一寸相片
- 下一篇: ai将会怎样影响计算机的发展,就目前人工