Redis消息队列 | 黑马点评
目錄
一、認識消息隊列
二、List模擬消息隊列
三、PubSub的消息隊列
四、Stream的消息隊列(重點)
????????1、單消費模式
????????2、消費者組
五、redis三種消息隊列對比?
?六、優化秒殺實戰
1、創建消息隊列
2、修改下單腳本
?3、接收消息處理
一、認識消息隊列
消息隊列,字面意思就存放消息的隊列。最簡單的消息隊列模型包括3個角色:
消息隊列:存儲和管理消息,也被稱為消息代理
生產者:發送消息到消息隊列
消費者:從消息隊列獲取消息并處理消息
解決了jvm堵塞隊列內存不足的問題,而且消息隊列是可以持久化的,宕機了依然能夠保存。
redis提供三種不同方式實現消息隊列:
- list結構:基于list結構模擬消息隊列
- PubSub:基于的點對點消息隊列
- Stream:比較完善的消息隊列模型(推薦)
二、List模擬消息隊列
redis的list結構是一個雙向鏈表,很容易模擬出隊列效果
隊列是入口和出口不在一邊,因此可以用LPUSH結合RPOP、或者RPUSH結合LPOP實現
但是,當隊列沒有消息時pop就會返回null,并不會jvm堵塞隊列那樣堵塞并等待消息,因此這里應該使用BRPOP或者BLPOP來實現堵塞隊列。
缺點:
無法避免消息丟失。從消息隊列取到消息,還沒來得及處理就掛掉了,這個消息就消失了。
只支持單消費者。一個人拿走就從隊列里面彈出了。
三、PubSub的消息隊列
PubSub(發布訂閱)是redis2.0版本引入的消息傳遞模型,消費者可以訂閱一個或多個channel(頻道),生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。
支持多生產、多消費
缺點:
不支持數據持久化(剛剛的list本質是做存儲的我們拿來當隊列所以可以持久化)
無法避免消息丟失。
消息堆積有上限,超出時數據丟失。(緩存空間是有上限的)
四、Stream的消息隊列(重點)
Stream是redis5.0引入的一種新數據類型,可以實現一個功能非常完善的消息隊列。
1、單消費模式
特點:
- 消息可回溯。不消失永久保存在隊列里。
- 一個消息可以被多個消費者讀取。讀完不消失的,可以多個讀
- 可以堵塞讀取
- 有消息漏讀的風險
2、消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組,監聽同一個隊列。
?消費者監聽消息的基本思路
stream類型消息隊列的消費者組特點:
- 消息可回溯
- 可以多消費者爭搶消息,加快消費速度
- 可以阻塞讀取
- 沒有消息漏鍍的風險
- 有消息確認機制,保證消息至少被消費一次
五、redis三種消息隊列對比?
?六、優化秒殺實戰
1、創建消息隊列
創建一個stream類型的消息隊列,名為stream.orders
2、修改下單腳本
修改之前秒殺下單lua腳本,認定有搶購資格后,直接向steam.orders中添加消息,內容包含voucher、userId、orderId
-- 優惠券id local voucherId = ARGV[1] -- 用戶id local userId = ARGV[2] -- 訂單id local orderId = ARGV[3]-- 庫存key local stockKey = "seckill:stock:"..voucherId -- 訂單key local orderKey = "seckill:order:"..voucherId-- 判斷庫存是否充足 if(tonumber(redis.call('get', stockKey)) <= 0) thenreturn 1 end-- 判斷用戶是否已經下過單 if(redis.call('sismember', orderKey, userId) == 1) thenreturn 2 end-- 扣減庫存 redis.call('incrby', stockKey, -1)-- 將 userId 存入當前優惠券的 set 集合 redis.call('sadd', orderKey, userId)-- 將訂單信息存入到消息隊列中 xadd stream.orders * k1 v1 k2 v2 redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0?3、接收消息處理
項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單
/**** 創建線程池*/private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/**** 容器啟動時,便開始創建獨立線程,從隊列中讀取數據,創建訂單*/@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while(true){try {// 獲取消息隊列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 判斷訂單信息是否為空if(list == null || list.isEmpty()){// 如果為 null,說明沒有消息,繼續下一次循環continue;}// 解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 創建訂單createVoucherOrder(voucherOrder);// 確認消息 xack s1 g1 idstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常!", e);handlePendingList();}}}private void handlePendingList() {while(true){try {// 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 判斷訂單信息是否為空if(list == null || list.isEmpty()){break;}// 解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 創建訂單createVoucherOrder(voucherOrder);// 確認消息 xack s1 g1 idstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常!", e);try {Thread.sleep(100);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}總結
以上是生活随笔為你收集整理的Redis消息队列 | 黑马点评的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java.集合 框架,接口,常用集合特点
- 下一篇: 树状结构数据查询方法