使用Redis Stream来做消息队列和在Asp.Net Core中的实现
寫在前面
我一直以來使用redis的時候,很多低烈度需求(并發要求不是很高)需要用到消息隊列的時候,在項目本身已經使用了Redis的情況下都想直接用Redis來做消息隊列,而不想引入新的服務,kafka和RabbitMQ等;
奈何這兄弟一直不給力;
雖然 Redis 的Pub/Sub 是實現了發布/訂閱的,但這家伙最坑的是:丟數據
由于Pub/Sub 只是簡單的實現了發布訂閱模式,簡單的溝通起生產者和消費者,當接收生產者的數據后并立即推送或者說轉發給訂閱消費者,并不會做任何的持久化、存儲操作。由此:
消費者(客戶端)掉線;
消費者未訂閱(所以使用的時候一定記得先訂閱再生產);
服務端宕機;
消費者消費不過來,消息堆積(生產數據受數據緩沖區限制);
以上情況都會導致生產數據的丟失,基于上坑,據我所知大家很少使用Pub/Sub ;
不過官方的哨兵集群通信的時候就是用的Pub/Sub;
然后,各路大佬結合隊列、阻塞等等實現了各種各樣的方案,主要是使用:BLPOP+LPUSH 的實現
這里就不一一展開了,有興趣請看葉老板文章;
可能是各種實現都會帶來各種的問題,redis的官方也看到了社區的掙扎。終于,到了Redis5.0,官方帶來了消息隊列的實現:Stream。
Redis Stream介紹
簡單來說Redis Stream 就是想用Redis 做消息隊列的最佳推薦;
XADD--發布消息
XADD?stream1?*?name?hei?age?18 XADD?stream1?*?name?zhangshan?age?19?#再發一條127.0.0.1:6379>?XADD?stream1?*?name?hei?age?18 "1631628884174-0" 127.0.0.1:6379>?XADD?stream1?*?name?zhangshan?age?19? "1631628890025-0"其中的'*'表示讓 Redis 自動生成唯一的消息 ID,格式是 「時間戳-自增序號」
XREAD--訂閱消息
訂閱消息
XREAD?COUNT?5?STREAMS?stream1?0-0127.0.0.1:6379>?XREAD?COUNT?5?STREAMS?stream1?0-0? 1)?1)?"stream1"2)?1)?1)?"1631628884174-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"2)?1)?"1631628890025-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19"'0-0' 表示從開頭讀取
如果需繼續拉取下一條,需傳入上一條消息的id
阻塞等待消息
XREAD?COUNT?5?BLOCK?50000?STREAMS?stream1?1631628890025-0阻塞等待消息id ‘1631628890025-0’ 后的消息
50000 阻塞時間(毫秒) ‘0’ 表示無限期阻塞
從到這里就可以看出 Pub/Sub多端訂閱的最大優點,Stream也是支持的。有的同學很快就發現問題了,這里多端訂閱后,沒有消息確認ACK機制。
沒錯,因為現在所有的消費者都是訂閱共同的消息,多端訂閱,如果某個客戶端ACK某條消息后,其他端消費不了,就實現不了多端消費了。
由此,引出 分組:GROUP
GROUP--訂閱分組消息(多端訂閱)
同樣先發布消息
XADD?stream1?*?name?hei?age?18 XADD?stream1?*?name?zhangshan?age?19127.0.0.1:6379>?XADD?stream1?*?name?hei?age?18 "1631629080208-0" 127.0.0.1:6379>?XADD?stream1?*?name?zhangshan?age?19? "1631629084083-0"XGROUP CREATE 創建分組
創建分組1
XGROUP?CREATE?stream1?group1?0-0127.0.0.1:6379>?XGROUP?CREATE?stream1?group1?0-0?? OK‘0-0’ ?表示從開頭讀取
'>' 表示讀取最新,未被消費過的消息
XREADGROUP--分組讀取
分組 group1
XREADGROUP?GROUP?group1?consumer1?COUNT?5?STREAMS?stream1?>consumer1 消費者名稱, redis服務器會記住第一次使用的消費者名稱;
127.0.0.1:6379>?XREADGROUP?GROUP?group1?consumer1?COUNT?5?STREAMS?stream1?>?? 1)?1)?"stream1"2)?1)?1)?"1631628884174-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"2)?1)?"1631628890025-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19"3)?1)?"1631629080208-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"4)?1)?"1631629084083-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19" 127.0.0.1:6379>?XREADGROUP?GROUP?group1?consumer1?COUNT?5?STREAMS?stream1?>?? (nil)同樣
‘0-0’ ?表示從開頭讀取
'>' 表示讀取最新,未被消費過的消息 (可以看到命令執行第二遍已經讀不到新消息了)
分組 group2
127.0.0.1:6379>?XGROUP?CREATE?stream1?group2?0-0?? OK 127.0.0.1:6379>?XREADGROUP?GROUP?group2?consumer1?COUNT?5?STREAMS?stream1?>?? 1)?1)?"stream1"2)?1)?1)?"1631628884174-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"2)?1)?"1631628890025-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19"3)?1)?"1631629080208-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"4)?1)?"1631629084083-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19可以看到可以讀到同樣的消息,多端訂閱沒有問題;
當然分組也支持阻塞讀取:
#和XREAD一樣 XREAD?COUNT?5?BLOCK?0?STREAMS?queue?1618469127777-0?#分組阻塞 XREADGROUP?GROUP?group2?consumer1?COUNT?5?BLOCK?0?STREAMS?stream1?>‘0’ 表示無限期阻塞,單位(毫秒)
XPENDING--待處理消息
消息使用XREADGROUP 讀取后會進入待處理條目列表(PEL);
我們看看:
XPENDING?stream1?group2127.0.0.1:6379>??XPENDING?stream1?group2 1)?(integer)?4 2)?"1631628884174-0" 3)?"1631629084083-0" 4)?1)?1)?"consumer1"2)?"4"表示:
(integer) 4 ? ? ?//表示當前消費者組的待處理消息的數量
"1631628884174-0" ? ? ? //消息最大id
"1631629084083-0" ? ? ?//最小id
"consumer1" ? ? ?// 消費者名稱
"4" //消費者待處理消息數量
XACK--刪除已處理消息(消息確認機制)
我們已經知道group2待處理消息有4條,我們從頭讀取看看:
XREADGROUP?GROUP?group2?consumer1?COUNT?5?STREAMS?stream1?0-0127.0.0.1:6379>?XREADGROUP?GROUP?group2?consumer1?COUNT?5?STREAMS?stream1?0-0 1)?1)?"stream1"2)?1)?1)?"1631628884174-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"2)?1)?"1631628890025-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19"3)?1)?"1631629080208-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"4)?1)?"1631629084083-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19"假設最后一條消息 ‘1631629084083-0’ 我已處理完成
127.0.0.1:6379>?XACK?stream1?group2?1631629084083-0 (integer)?1再看:
127.0.0.1:6379>?XREADGROUP?GROUP?group2?consumer1?COUNT?5?STREAMS?stream1?0-0 1)?1)?"stream1"2)?1)?1)?"1631628884174-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"2)?1)?"1631628890025-0"2)?1)?"name"2)?"zhangshan"3)?"age"4)?"19"3)?1)?"1631629080208-0"2)?1)?"name"2)?"hei"3)?"age"4)?"18"127.0.0.1:6379>??XPENDING?stream1?group2 1)?(integer)?3 2)?"1631628884174-0" 3)?"1631629080208-0" 4)?1)?1)?"consumer1"2)?"3"可以清楚看到goroup2 待處理消息剩下3條;
這時 Redis 已經把這條消息標記為「處理完成」不再追蹤;
Stream在Asp.net Core中的使用
private static string _connstr = "172.16.3.119:6379"; private static string _keyStream = "stream1"; private static string _nameGrourp = "group1"; private static string _nameConsumer = "consumer1";發布:
csRedis.XAdd(_keyStream,?"*",?("name",?"message1"));訂閱:
static?async?Task?CsRedisStreamConsumer() {Console.WriteLine("CsRedis?StreamConsumer?start!");var?csRedis?=?new?CSRedis.CSRedisClient(_connstr);csRedis.XAdd(_keyStream,?"*",?("name",?"message1"));try{csRedis.XGroupCreate(_keyStream,?_nameGrourp);}catch?{?}(string?key,?(string?id,?string[]?items)[]?data)[]?product;(string?Pid,?string?Platform,?string?Time)?data?=?(null,?null,?null);while?(true){try{product?=?csRedis.XReadGroup(_nameGrourp,?_nameConsumer,?1,?10000,?(_keyStream,?">"));if?(product?.Length?>?0?==?true?&&?product[0].data?.Length?>?0?==?true){Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value?=>{Console.WriteLine($"????{value}");});//csRedis.XAck(_keyStream,?_nameGrourp,?product[0].data[0].id);}}catch?(Exception){//throw;}} }CSRedisCore
動畫2這里的超時報錯可通過修改連接參數:syncTimeout 解決
CSRedisCore支持阻塞讀取;
StackExchange.Redis
發布:
db.StreamAdd(_keyStream,?"name",?"message1",?"*");訂閱:
static?async?Task?StackExchangeRedisStreamConsumer() {Console.WriteLine("StackExchangeRedis?StreamConsumer?start!");var?redis?=?ConnectionMultiplexer.Connect(_connstr);var?db?=?redis.GetDatabase();try{///初始化方式1//db.StreamAdd(_keyStream,?"name",?"message1",?"*");//db.StreamCreateConsumerGroup(_keyStream,?_nameGrourp);//方式2db.StreamCreateConsumerGroup(_keyStream,?_nameGrourp,?StreamPosition.NewMessages);}catch?{?}StreamEntry[]?data?=?null;while?(true){data?=?db.StreamReadGroup(_keyStream,?_nameGrourp,?_nameConsumer,?">",?count:?1,?noAck:?true);if?(data?.Length?>?0?==?true){Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");data.FirstOrDefault().Values.ToList().ForEach(c?=>{Console.WriteLine($"????{c.Name}:{c.Value}");});db.StreamAcknowledge(_keyStream,?_nameGrourp,?data.FirstOrDefault().Id);}} }動畫StackExchange.Redis 有點比較坑的是不存在阻塞讀取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing
QA
Q:Stream是否支持AOF、RDB持久化?
**A:**支持,其它數據類型一樣,每個寫操作,也都會寫入到 RDB 和 AOF 中。
Q:Stream是否還是會丟數據?若是,何種情況下?;
**A:**會;1、AOF是定時寫盤的,如果數據還在內存中時redis服務宕機就會;2、主從切換時(從庫還未同步完成主庫發來的數據,就被提成主庫);3、消息隊列超MAXLEN限制;
總結
技術中有的時候沒有“銀彈”,只有更適合的技術,汝之蜜糖彼之砒霜;
很多時候的技術選型都是個比較麻煩的東西,對選型人的要求很高;你可能不是只需要熟悉其中的一種路線,而是要踩過各種各樣的坑,再根據當前受限的環境,選擇比較適合目前需求/團隊的;
回到Stream上,我認為目前Stream能滿足挺大部分隊列需求;
特別是“在項目本身已經使用了Redis的情況下都想直接用Redis來做消息隊列,而不想引入新的更專業的mq,比如kafka和RabbitMQ的時候”
當然,最終決定需要用更專業的mq與否的,還是需求;
引用
http://www.redis.cn/
https://database.51cto.com/art/202104/659208.htm
https://github.com/2881099/csredis/
https://stackexchange.github.io/StackExchange.Redis/Streams.html
文章博客園地址請點擊“閱讀原文”
總結
以上是生活随笔為你收集整理的使用Redis Stream来做消息队列和在Asp.Net Core中的实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决ASP.NET Core部署到IIS
- 下一篇: oh,我的老伙计,你看看这近五十个dap