Redis 中的发布/订阅功能
發布/ 訂閱系統 是 Web 系統中比較常用的一個功能。簡單點說就是 發布者發布消息,訂閱者接受消息,這有點類似于我們的報紙/ 雜志社之類的:
雖然可以使用一個 list 列表結構結合 lpush 和 rpop 來實現消息隊列的功能,但是似乎很難實現實現 消息多播 的功能:
為了支持消息多播,Redis 不能再依賴于那 5 種基礎的數據結構了,它單獨使用了一個模塊來支持消息多播,這個模塊就是 PubSub,也就是 PublisherSubscriber (發布者/ 訂閱者模式)。
PubSub 簡介
我們從 上面的圖 中可以看到,基于 list 結構的消息隊列,是一種 Publisher 與 Consumer 點對點的強關聯關系,Redis 為了消除這樣的強關聯,引入了另一種概念:頻道 (channel):
當 Publisher 往 channel 中發布消息時,關注了指定 channel 的 Consumer 就能夠同時受到消息。但這里的 問題 是,消費者訂閱一個頻道是必須 明確指定頻道名稱 的,這意味著,如果我們想要 訂閱多個 頻道,那么就必須 顯式地關注多個 名稱。
為了簡化訂閱的繁瑣操作,Redis 提供了 模式訂閱 的功能 Pattern Subscribe,這樣就可以 一次性關注多個頻道 了,即使生產者新增了同模式的頻道,消費者也可以立即受到消息:
例如上圖中,所有 位于圖片下方的 Consumer 都能夠收到消息。
Publisher 往 wmyskxz.chat 這個 channel 中發送了一條消息,不僅僅關注了這個頻道的 Consumer 1 和 Consumer 2 能夠受到消息,
圖片中的兩個 channel 都和模式 wmyskxz.* 匹配,所以 Redis 此時會同樣發送消息給訂閱了 wmyskxz.* 這個模式的 Consumer 3 和關注了在這個模式下的另一個頻道 wmyskxz.log 下的 Consumer 4 和 Consumer 5。
另一方面,如果接收消息的頻道是 wmyskxz.chat,那么 Consumer 3 也會收到消息。
快速體驗
在 Redis 中,PubSub 模塊的使用非常簡單,常用的命令也就下面這么幾條:
訂閱頻道:
SUBSCRIBE channel [channel …] # 訂閱給定的一個或多個頻道的信息
PSUBSCRIBE pattern [pattern …] # 訂閱一個或多個符合給定模式的頻道# 發布頻道:
PUBLISH channel message # 將消息發送到指定的頻道# 退訂頻道:
UNSUBSCRIBE [channel [channel …]] # 退訂指定的頻道
PUNSUBSCRIBE [pattern [pattern …]] #退訂所有給定模式的頻道
我們可以在本地快速地來體驗一下 PubSub:
具體步驟如下:
1.開啟本地 Redis 服務,新建兩個控制臺窗口;
2.在其中一個窗口輸入 SUBSCRIBE wmyskxz.chat 關注 wmyskxz.chat 頻道,讓這個窗口成為 消費者。
3.在另一個窗口輸入 PUBLISH wmyskxz.chat ‘message’ 往這個頻道發送消息,這個時候就會看到 另一個窗口實時地出現 了發送的測試消息。
實現原理
可以看到,我們通過很簡單的兩條命令,幾乎就可以簡單使用這樣的一個 發布/ 訂閱系統 了,但是具體是怎么樣實現的呢?
每個 Redis 服務器進程維持著一個標識服務器狀態 的 redis.h/redisServer 結構,其中就 保存著有訂閱的頻道 以及 訂閱模式 的信息:
struct redisServer {// ...dict *pubsub_channels; // 訂閱頻道list *pubsub_patterns; // 訂閱模式// ... };訂閱頻道原理
當客戶端訂閱某一個頻道之后,Redis 就會往 pubsub_channels 這個字典中新添加一條數據,實際上這個 dict 字典維護的是一張鏈表,比如,下圖展示的 pubsub_channels 示例中,client 1、client 2 就訂閱了 channel 1,而其他頻道也分別被其他客戶端訂閱:
SUBSCRIBE 命令
SUBSCRIBE 命令的行為可以用下列的偽代碼表示:
def SUBSCRIBE(client, channels):# 遍歷所有輸入頻道for channel in channels:# 將客戶端添加到鏈表的末尾redisServer.pubsub_channels[channel].append(client)UNSUBSCRIBE 命令
使用 UNSUBSCRIBE 命令可以退訂指定的頻道,這個命令執行的是訂閱的反操作:它從 pubsub_channels 字典的給定頻道(鍵)中,刪除關于當前客戶端的信息,這樣被退訂頻道的信息就不會再發送給這個客戶端。
訂閱模式原理
正如我們上面說到了,當發送一條消息到 wmyskxz.chat 這個頻道時,Redis 不僅僅會發送到當前的頻道,還會發送到匹配于當前模式的所有頻道,實際上,pubsub_patterns 背后還維護了一個 redis.h/pubsubPattern 結構:
每當調用 PSUBSCRIBE 命令訂閱一個模式時,程序就創建一個包含客戶端信息和被訂閱模式的 pubsubPattern 結構,并將該結構添加到 redisServer.pubsub_patterns 鏈表中。
我們來看一個 pusub_patterns 鏈表的示例:
這個時候客戶端 client 3 執行 PSUBSCRIBE wmyskxz.java.*,那么 pubsub_patterns 鏈表就會被更新成這樣:
通過遍歷整個 pubsub_patterns 鏈表,程序可以檢查所有正在被訂閱的模式,以及訂閱這些模式的客戶端。
PUBLISH 命令
上面給出的偽代碼并沒有 完整描述 PUBLISH 命令的行為,因為 PUBLISH 除了將 message 發送到 所有訂閱 channel 的客戶端 之外,它還會將 channel 和 pubsub_patterns 中的 模式 進行對比,如果 channel 和某個模式匹配的話,那么也將 message 發送到 訂閱那個模式的客戶端。
完整描述 PUBLISH 功能的偽代碼定于如下:
PUNSUBSCRIBE 命令
使用 PUNSUBSCRIBE 命令可以退訂指定的模式,這個命令執行的是訂閱模式的反操作:序會刪除 redisServer.pubsub_patterns 鏈表中,所有和被退訂模式相關聯的 pubsubPattern 結構,這樣客戶端就不會再收到和模式相匹配的頻道發來的信息。
PubSub 的缺點
盡管 Redis 實現了 PubSub 模式來達到了 多播消息隊列 的目的,但在實際的消息隊列的領域,幾乎 找不到特別合適的場景,因為它的缺點十分明顯:
?沒有 Ack 機制,也不保證數據的連續: PubSub 的生產者傳遞過來一個消息,Redis 會直接找到相應的消費者傳遞過去。如果沒有一個消費者,那么消息會被直接丟棄。如果開始有三個消費者,其中一個突然掛掉了,過了一會兒等它再重連時,那么重連期間的消息對于這個消費者來說就徹底丟失了。
?不持久化消息: 如果 Redis 停機重啟,PubSub 的消息是不會持久化的,畢竟 Redis 宕機就相當于一個消費者都沒有,所有的消息都會被直接丟棄。
基于上述缺點,Redis 的作者甚至單獨開啟了一個 Disque 的項目來專門用來做多播消息隊列,不過該項目目前好像都沒有成熟。不過后來在 2018 年 6 月,Redis 5.0 新增了 Stream 數據結構,這個功能給 Redis 帶來了 持久化消息隊列,從此 PubSub 作為消息隊列的功能可以說是就消失了…
二、更為強大的 Stream | 持久化的發布/訂閱系統
Redis Stream 從概念上來說,就像是一個 僅追加內容 的 消息鏈表,把所有加入的消息都一個一個串起來,每個消息都有一個唯一的 ID 和內容,這很簡單,讓它復雜的是從 Kafka 借鑒的另一種概念:消費者組(Consumer Group) (思路一致,實現不同):
上圖就展示了一個典型的 Stream 結構。每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創建。我們對圖中的一些概念做一下解釋:
?Consumer Group:消費者組,可以簡單看成記錄流狀態的一種數據結構。消費者既可以選擇使用 XREAD 命令進行 獨立消費,也可以多個消費者同時加入一個消費者組進行 組內消費。同一個消費者組內的消費者共享所有的 Stream 信息,同一條消息只會有一個消費者消費到,這樣就可以應用在分布式的應用場景中來保證消息的唯一性。
**?last_delivered_id:**用來表示消費者組消費在 Stream 上 消費位置 的游標信息。每個消費者組都有一個 Stream 內 唯一的名稱,消費者組不會自動創建,需要使用 XGROUP CREATE 指令來顯式創建,并且需要指定從哪一個消息 ID 開始消費,用來初始化 last_delivered_id 這個變量。
?pending_ids:每個消費者內部都有的一個狀態變量,用來表示 已經 被客戶端 獲取,但是 還沒有 ack 的消息。記錄的目的是為了 保證客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失而沒有對消息進行處理。如果客戶端沒有 ack,那么這個變量里面的消息 ID 就會越來越多,一旦某個消息被 ack,它就會對應開始減少。這個變量也被 Redis 官方稱為 PEL (Pending Entries List)。
消息 ID 和消息內容
消息 ID
消息 ID 如果是由 XADD 命令返回自動創建的話,那么它的格式會像這樣:timestampInMillis-sequence (毫秒時間戳-序列號),例如 1527846880585-5,它表示當前的消息是在毫秒時間戳 1527846880585 時產生的,并且是該毫秒內產生的第 5 條消息。
這些 ID 的格式看起來有一些奇怪,為什么要使用時間來當做 ID 的一部分呢? 一方面,我們要 滿足 ID 自增 的屬性,另一方面,也是為了 支持范圍查找 的功能。由于 ID 和生成消息的時間有關,這樣就使得在根據時間范圍內查找時基本上是沒有額外損耗的。
當然消息 ID 也可以由客戶端自定義,但是形式必須是 “整數-整數”,而且后面加入的消息的 ID 必須要大于前面的消息 ID。
消息內容
消息內容就是普通的鍵值對,形如 hash 結構的鍵值對。
增刪改查示例
增刪改查命令很簡單,詳情如下:
1.xadd:追加消息
2.xdel:刪除消息,這里的刪除僅僅是設置了標志位,不影響消息總長度
3.xrange:獲取消息列表,會自動過濾已經刪除的消息
4.xlen:消息長度
5.del:刪除Stream
使用示例:
# *號表示服務器自動生成ID,后面順序跟著一堆key/value 127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫laoqian,年齡30歲 1527849609889-0 # 生成的消息ID 127.0.0.1:6379> xadd codehole * name xiaoyu age 29 1527849629172-0 127.0.0.1:6379> xadd codehole * name xiaoqian age 1 1527849637634-0 127.0.0.1:6379> xlen codehole (integer) 3 127.0.0.1:6379> xrange codehole - + # -表示最小值, +表示最大值 1) 1) 1527849609889-02) 1) "name"2) "laoqian"3) "age"4) "30" 2) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 3) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小消息ID的列表 1) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 2) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大消息ID的列表 1) 1) 1527849609889-02) 1) "name"2) "laoqian"3) "age"4) "30" 2) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 127.0.0.1:6379> xdel codehole 1527849609889-0 (integer) 1 127.0.0.1:6379> xlen codehole # 長度不受影響 (integer) 3 127.0.0.1:6379> xrange codehole - + # 被刪除的消息沒了 1) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 2) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> del codehole # 刪除整個Stream (integer) 1獨立消費示例
我們可以在不定義消費組的情況下進行 Stream 消息的 獨立消費,當 Stream 沒有新消息時,甚至可以阻塞等待。Redis 設計了一個單獨的消費指令 xread,可以將 Stream 當成普通的消息隊列(list)來使用。使用 xread 時,我們可以完全忽略 消費組(Consumer Group) 的存在,就好比 Stream 就是一個普通的列表(list):
# 從Stream頭部讀取兩條消息 127.0.0.1:6379> xread count 2 streams codehole 0-0 1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30"2) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29"# 從Stream尾部讀取一條消息,毫無疑問,這里不會返回任何消息 127.0.0.1:6379> xread count 1 streams codehole $ (nil)# 從尾部阻塞等待新消息到來,下面的指令會堵住,直到新消息到來 127.0.0.1:6379> xread block 0 count 1 streams codehole $# 我們從新打開一個窗口,在這個窗口往Stream里塞消息 127.0.0.1:6379> xadd codehole * name youming age 60 1527852774092-0# 再切換到前面的窗口,我們可以看到阻塞解除了,返回了新的消息內容# 而且還顯示了一個等待時間,這里我們等待了93s 127.0.0.1:6379> xread block 0 count 1 streams codehole $ 1) 1) "codehole"2) 1) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60" (93.11s)客戶端如果想要使用 xread 進行 順序消費,一定要 記住當前消費 到哪里了,也就是返回的消息 ID。下次繼續調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續消費后續的消息。
block 0 表示永遠阻塞,直到消息到來,block 1000 表示阻塞 1s,如果 1s 內沒有任何消息到來,就返回 nil:
127.0.0.1:6379> xread block 1000 count 1 streams codehole $ (nil) (1.07s)創建消費者示例
Stream 通過 xgroup create 指令創建消費組(Consumer Group),需要傳遞起始消息 ID 參數用來初始化 last_delivered_id 變量:
127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示從頭開始消費 OK# $表示從尾部開始消費,只接受新消息,當前Stream消息會全部忽略 127.0.0.1:6379> xgroup create codehole cg2 $ OK 127.0.0.1:6379> xinfo codehole # 獲取Stream信息1) length2) (integer) 3 # 共3個消息3) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 2 # 兩個消費組9) first-entry # 第一個消息 10) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30" 11) last-entry # 最后一個消息 12) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費組信息 1) 1) name2) "cg1"3) consumers4) (integer) 0 # 該消費組還沒有消費者5) pending6) (integer) 0 # 該消費組沒有正在處理的消息 2) 1) name2) "cg2"3) consumers # 該消費組還沒有消費者4) (integer) 05) pending6) (integer) 0 # 該消費組沒有正在處理的消息組內消費示例
Stream 提供了 xreadgroup 指令可以進行消費組的組內消費,需要提供 消費組名稱、消費者名稱和起始消息 ID。它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PEL (正在處理的消息) 結構里,客戶端處理完畢后使用 xack 指令 通知服務器,本條消息已經處理完畢,該消息 ID 就會從 PEL 中移除,下面是示例:
# >號表示從當前消費組的last_delivered_id后面開始讀# 每當消費者讀取一條消息,last_delivered_id變量就會前進 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1"2) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60"# 再繼續讀取,就沒有新消息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > (nil)# 那就阻塞等待吧 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole ># 開啟另一個窗口,往里塞消息 127.0.0.1:6379> xadd codehole * name lanying age 61 1527854062442-0# 回到前一個窗口,發現阻塞解除,收到新消息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527854062442-02) 1) "name"2) "lanying"3) "age"4) "61" (36.54s) 127.0.0.1:6379> xinfo groups codehole # 觀察消費組信息 1) 1) name2) "cg1"3) consumers4) (integer) 1 # 一個消費者5) pending6) (integer) 5 # 共5條正在處理的信息還有沒有ack 2) 1) name2) "cg2"3) consumers4) (integer) 0 # 消費組cg2沒有任何變化,因為前面我們一直在操縱cg15) pending6) (integer) 0# 如果同一個消費組有多個消費者,我們可以通過xinfo consumers指令觀察每個消費者的狀態 127.0.0.1:6379> xinfo consumers codehole cg1 # 目前還有1個消費者 1) 1) name2) "c1"3) pending4) (integer) 5 # 共5條待處理消息5) idle6) (integer) 418715 # 空閑了多長時間ms沒有讀取消息了# 接下來我們ack一條消息 127.0.0.1:6379> xack codehole cg1 1527851486781-0 (integer) 1 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name2) "c1"3) pending4) (integer) 4 # 變成了5條5) idle6) (integer) 668504# 下面ack所有消息 127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0 (integer) 4 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name2) "c1"3) pending4) (integer) 0 # pel空了5) idle6) (integer) 745505QA 1:Stream 消息太多怎么辦? | Stream 的上限
很容易想到,要是消息積累太多,Stream 的鏈表豈不是很長,內容會不會爆掉就是個問題了。xdel 指令又不會刪除消息,它只是給消息做了個標志位。
Redis 自然考慮到了這一點,所以它提供了一個定長 Stream 功能。在 xadd 的指令提供一個定長長度 maxlen,就可以將老的消息干掉,確保最多不超過指定長度,使用起來也很簡單:
> XADD mystream MAXLEN 2 * value 1 1526654998691-0> XADD mystream MAXLEN 2 * value 2 1526654999635-0> XADD mystream MAXLEN 2 * value 3 1526655000369-0> XLEN mystream (integer) 2> XRANGE mystream - + 1) 1) 1526654999635-02) 1) "value"2) "2" 2) 1) 1526655000369-02) 1) "value"2) "3"如果使用 MAXLEN 選項,當 Stream 的達到指定長度后,老的消息會自動被淘汰掉,因此 Stream 的大小是恒定的。目前還沒有選項讓 Stream 只保留給定數量的條目,因為為了一致地運行,這樣的命令必須在很長一段時間內阻塞以淘汰消息。(例如在添加數據的高峰期間,你不得不長暫停來淘汰舊消息和添加新的消息)
另外使用 MAXLEN 選項的花銷是很大的,Stream 為了節省內存空間,采用了一種特殊的結構表示,而這種結構的調整是需要額外的花銷的。所以我們可以使用一種帶有 ~ 的特殊命令:
XADD mystream MAXLEN ~ 1000 * … entry fields here …
它會基于當前的結構合理地對節點執行裁剪,來保證至少會有 1000 條數據,可能是 1010 也可能是 1030。
QA 2:PEL 是如何避免消息丟失的?
在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是 PEL 里已經保存了發出去的消息 ID,待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過此時 xreadgroup 的起始消息 ID 不能為參數 > ,而必須是任意有效的消息 ID,一般將參數設為 0-0,表示讀取所有的 PEL 消息以及自 last_delivered_id 之后的新消息。
Redis Stream Vs Kafka
Redis 基于內存存儲,這意味著它會比基于磁盤的 Kafka 快上一些,也意味著使用 Redis 我們 不能長時間存儲大量數據。不過如果您想以 最小延遲 實時處理消息的話,您可以考慮 Redis,但是如果 消息很大并且應該重用數據 的話,則應該首先考慮使用 Kafka。
另外從某些角度來說,Redis Stream 也更適用于小型、廉價的應用程序,因為 Kafka 相對來說更難配置一些。
總結
以上是生活随笔為你收集整理的Redis 中的发布/订阅功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【毕业设计】基于微信小程序的备忘录记事助
- 下一篇: Python 将英语单词列表,转换为听写