使用 Redis Stream 实现消息队列
使用 Redis Stream 實現消息隊列
Intro
Redis 5.0 中增加了 Stream 的支持,利用 Stream 我們可以實現可靠的消息隊列,并且支持一個消息被多個消費者所消費,可以很好的實現消息隊列
Simple Usage
首先我們來看一個簡單版本的 Stream 使用,我們在代碼里使用一個發布者,一個消費者來模擬一個簡單的消息隊列的場景
來看下面的測試代碼:
private?const?string?StreamKey?=?"test-simple-stream";public?static?async?Task?MainTest() {await?RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);//?register?background?consumer_?=?Task.Factory.StartNew(Consume).ConfigureAwait(false);//await?Publish(); }private?static?async?Task?Publish() {Console.WriteLine("Press?Enter?to?publish?messages,?Press?Q?to?exit");var?input?=?Console.ReadLine();while?(input?is?not?"q"?and?not?"Q"){var?redis?=?RedisHelper.GetDatabase();for?(var?i?=?0;?i?<?10;?i++){await?redis.StreamAddAsync(StreamKey,?"message",?$"test_message_{i}");}input?=?Console.ReadLine();} }private?static?async?Task?Consume() {var?lastMsgId?=?"0-0";while?(true){await?InvokeHelper.TryInvokeAsync(async?()?=>{var?redis?=?RedisHelper.GetDatabase();var?entries?=?await?redis.StreamReadAsync(StreamKey,?lastMsgId,?2);if?(entries.Length?==?0){return;}foreach?(var?entry?in?entries){Console.WriteLine(entry.Id);entry.Values.Dump();//?delete?message?if?you?want//?redis.StreamDelete(StreamKey,?new[]?{?entry.Id?});}lastMsgId?=?entries[^1].Id;});await?Task.Delay(200);} }上面的代碼會使用一個后臺線程來運行一個 Consumer 來從 Stream 中讀取消息,有兩種消費消息的模式,一種是自己維護一個處理的消息 offset,每次從這個 offset 之后讀取新消息,另外一種模式不需要維護本地的 offset,可以在處理完消息之后直接刪掉消息,默認消息是不會刪消息的,所以如果不刪消息的話需要維護
Publisher 每次會發布 10 條消息,Consumer 每次會讀取兩條消息,處理之后會等待 200 ms,之后再查詢消息
來看一下運行效果吧:
Consumer Group
上面的示例會相對來說比較簡單,只有一個 Consumer,但是在比較常用的場景下往往會有多個消費者處理,
比如說用戶注冊成功之后,發布一條消息可能會有多個 Consumer 同時給用戶發郵件或短信以及給用戶加積分等操作,這種場景下使用上面的模式就不合適了,Redis Stream 中增加了 Consumer Group 的概念(有的人甚至稱 Redis 內置了一個 Kafka),在創建了 Consumer Group 之后,向 Stream 發布消息的時候會廣播到各個 Consumer Group 中,每個 Consumer Group 的消息消費是獨立的,不同的 Consumer Group 的消費速度可以不一致,一個 Consumer Group 也可以有多個 Consumer 同時運行,同一個 Group 內的多個 Consumer 是會共享一個 Consumer Group 的消息消費,而且我們可以手動進行消息的 ACK
來看下面的示例代碼吧:
private?const?string?StreamKey?=?"test-stream-group"; private?static?int?_consumerCount;public?static?async?Task?MainTest() {await?RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);//?register?background?consumer_?=?await?Task.Factory.StartNew(Consume).ConfigureAwait(false);_?=?await?Task.Factory.StartNew(Consume).ConfigureAwait(false);//await?Publish(); }private?static?async?Task?Publish() {Console.WriteLine("Press?Enter?to?publish?messages,?Press?Q?to?exit");var?input?=?Console.ReadLine();while?(input?is?not?"q"?and?not?"Q"){var?redis?=?RedisHelper.GetDatabase();for?(var?i?=?0;?i?<?10;?i++){await?redis.StreamAddAsync(StreamKey,?"message",?$"test_message_{i}");}input?=?Console.ReadLine();} }private?static?async?Task?Consume() {Interlocked.Increment(ref?_consumerCount);var?groupName?=?$"group-{_consumerCount}";var?consumerName?=?$"consumer-{_consumerCount}";var?redis?=?RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey,?groupName);while?(true){await?InvokeHelper.TryInvokeAsync(async?()?=>{var?messages?=?await?redis.StreamReadGroupAsync(StreamKey,?groupName,?consumerName,?count:?SecurityHelper.Random.Next(1,?4));if?(messages.Length?==?0){return;}foreach?(var?message?in?messages){Console.WriteLine($"{groupName}-{message.Id}-{message.Values.ToJson()}");await?redis.StreamAcknowledgeAsync(StreamKey,?groupName,?message.Id);}});await?Task.Delay(200);} }上面的示例代碼會先注冊兩個 Consumer Group,兩個 Consumer Group 內各有一個 consumer,你也可以使用多個 consumer,為了體現各個 Consumer Group 是獨立的,每次獲取消息的 Count 是會隨機指定的,在讀取的消息之后會輸出消息內容來代替處理消息的邏輯,處理完成之后進行消息的 ACK,消息的發布邏輯和上面的示例是類似的
上述代碼執行輸出示例:
可以看到我們發布的消息,每一個 consumer group 都會處理消息,而且處理消息的速度是獨立的,互不影響
通過 XINFO 命令我們可以對 Stream 做一些監控
More
利用 Redis 的 Stream 我們可以實現可靠的一個消息機制,stream 的每一條消息都會有一個消息 Id,默認是兩個部分,一個部分是時間戳,另一個部分是一個序列號,消息 Id 可以自定義,但是通常情況下推薦用默認的 id
Redis 中的 List、HashSet、Set、ZSet 這些數據類型中沒有元素的時候會把對應的 Key 也會刪掉,但是 Stream 是不會的,Stream 允許沒有消息的時候依然存在
Redis Stream 使用的時候需要注意我們是可以指定 Stream 的消息長度的,如果我們指定了最大消息長度 10000,超出 10000 的時候舊消息就會被擠出隊列,可能會出現消息的丟失,需要對 Stream 做必要的監控和報警
References
https://redis.io/topics/streams-intro
https://redis.io/commands
https://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample
總結
以上是生活随笔為你收集整理的使用 Redis Stream 实现消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: github star破13k,Dapr
- 下一篇: Autofac框架初识与应用