生活随笔
收集整理的這篇文章主要介紹了
聊聊Redis消息队列-实现异步秒杀
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、前言
消息隊列(Message Queue), 字面意思就是存放消息的隊列,最簡單的消息隊列模型包括3個角色:
消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker); 生產者:發送消息到消息隊列; 消費者:從消息隊列獲取消息并處理消息。 Redis提供了三種不同的方式來實現消息隊列: list結構:基于List結構模擬消息隊列; PubSub: 基本的點對點消息模型; Stream: 比較完善的消息隊列模型
二、基于List結構模擬消息隊列
消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數據結構是一個雙向鏈表,很容易模擬出隊列效果。 隊列是入口和出口不在一邊,因此我們可以利用:LPUSH結合RPOP、或者RPUSH結合LPOP來實現; 不過要注意的是,當隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應該適用BRPOP或者BLPOP來實現阻塞效果。
2.1 基于List的消息隊列有哪些優缺點?
優點: 利用Redis存儲,不受限于JVM內存上限; 基于Redis的持久化機制,數據安全性有保證; 可以滿足消息有序性; 缺點:
三、基于PubSub的消息隊列
PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。
SUBSCCRIBE channel [channel]: 訂閱一個或多個頻道; PUBLISH channel msg: 向一個頻道發送消息; PSUBSCRIBE pattern[pattern]: 訂閱與pattern格式匹配的所有頻道
3.1 基于PubSub的消息隊列有哪些優缺點?
優點: 缺點: 不支持數據持久化; 無法避免消息丟失; 消息堆積有上限,超出時數據丟失
四、基于Stream的消息隊列
Stream 是 Redis 5.0 引入的一種新數據類型,可以實現一個功能非常完善的消息隊列;
發送消息的命令: 讀取消息的方式之一:XREAD XREAD阻塞方式,讀取最新的消息: 在業務開發中,我們可以循環的調用XREAD阻塞方式來查詢最新消息,從而實現持續監聽隊列的效果,偽代碼如下:
4.1 STREAM類型消息隊列的XREAD命令特點:
消息可回溯; 一個消息可以被多個消費者讀取; 可以阻塞讀取; 有消息漏讀的風險
五、基于Stream的消息隊列-消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:
消息分流:隊列中的消息會分流給組內的不同消費者,而不是重復的消費,從而加快消息處理的速度; 消息標示:消費者組會維護一個標識,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標識之后讀取消息。確保每一個消息都會被消費; 消息確認:消費者獲取消費后,消息處于pending狀態,并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。
通俗的講,就是多個消費者在一個隊列中處于競爭關系,多個消費者來處理隊列消息,加快消息處理的速度。而且消費者組會給消息加上一個標識,記錄最新讀到的消息。如果中途消息處理完未提交,消息還會進入pending狀態。進入pending-list中,不會造成數據的丟失。
5.1 STREAM類型消息隊列的XREADGROUP命令特點:
消息可回溯; 可以多消費者爭搶消息,加快消費速度; 可以阻塞讀取; 沒有消息漏讀的風險; 有消息確認機制,保證消息至少被消費一次
總結
六、案例
基于Redis的Stream結構作為消息隊列,實現異步秒殺下單
需求: ① 創建一個Stream類型的消息隊列,名為stream.orders; ② 修改之前的秒殺下單Lua腳本,在認定有搶購資格后,直接向stream.orders中添加消息,內容包含voucherId、userId、orderId; ③ 項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單。
local voucherId
= ARGV
[ 1 ]
local userId
= ARGV
[ 2 ]
local orderId
= ARGV
[ 3 ]
local stockKey
= "seckill:stock:" .. voucherId
local orderKey
= "seckill:order:" .. userId
local stock
= redis
. call ( 'get' , stockKey
)
local stockNumber
= tonumber ( stock
)
if ( stockNumber
<= 0 ) then return 1
end
if ( redis
. call ( 'sismember' , orderKey
, userId
) == 1 ) then return 2
end
redis
. call ( 'incrby' , stockKey
, - 1 )
redis
. call ( 'sadd' , orderKey
, userId
)
redis
. call ( 'xadd' , 'stream.orders' , '*' , 'userId' , userId
, 'voucherId' , voucherId
, 'id' , orderId
)
return 0
private IVoucherOrderService proxy
; private static final DefaultRedisScript < Long > SECKILL_SCRIPT ; static { SECKILL_SCRIPT = new DefaultRedisScript < > ( ) ; SECKILL_SCRIPT . setLocation ( new ClassPathResource ( "seckill.lua" ) ) ; SECKILL_SCRIPT . setResultType ( Long . class ) ; } private final BlockingQueue < VoucherOrder > orderTasks
= new ArrayBlockingQueue < > ( 1024 * 1024 ) ; private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors . newSingleThreadExecutor ( ) ; @PostConstruct private void init ( ) { SECKILL_ORDER_EXECUTOR . submit ( new VoucherOrderHandler ( ) ) ; } private class VoucherOrderHandler implements Runnable { String queueName
= "stream.orders" ; @Override public void run ( ) { while ( true ) { try { List < MapRecord < String , Object , Object > > list
= stringRedisTemplate
. opsForStream ( ) . read ( Consumer . from ( "g1" , "c1" ) , StreamReadOptions . empty ( ) . count ( 1 ) . block ( Duration . ofSeconds ( 2 ) ) , StreamOffset . create ( queueName
, ReadOffset . lastConsumed ( ) ) ) ; if ( CollectionUtils . isEmpty ( list
) ) { continue ; } MapRecord < String , Object , Object > record
= list
. get ( 0 ) ; Map < Object , Object > values
= record
. getValue ( ) ; VoucherOrder voucherOrder
= BeanUtil . fillBeanWithMap ( values
, new VoucherOrder ( ) , true ) ; handleVoucherOrder ( voucherOrder
) ; stringRedisTemplate
. opsForStream ( ) . acknowledge ( queueName
, "g1" , record
. getId ( ) ) ; } catch ( Exception e
) { log
. error ( "處理訂單異常" , e
) ; handlePendingList ( ) ; } } } private void handlePendingList ( ) { while ( true ) { try { List < MapRecord < String , Object , Object > > list
= stringRedisTemplate
. opsForStream ( ) . read ( Consumer . from ( "g1" , "c1" ) , StreamReadOptions . empty ( ) . count ( 1 ) , StreamOffset . create ( queueName
, ReadOffset . from ( "0" ) ) ) ; if ( CollectionUtils . isEmpty ( list
) ) { break ; } MapRecord < String , Object , Object > record
= list
. get ( 0 ) ; Map < Object , Object > values
= record
. getValue ( ) ; VoucherOrder voucherOrder
= BeanUtil . fillBeanWithMap ( values
, new VoucherOrder ( ) , true ) ; handleVoucherOrder ( voucherOrder
) ; stringRedisTemplate
. opsForStream ( ) . acknowledge ( queueName
, "g1" , record
. getId ( ) ) ; } catch ( Exception e
) { log
. error ( "處理訂單異常" , e
) ; try { TimeUnit . MILLISECONDS . sleep ( 20 ) ; } catch ( InterruptedException ex
) { ex
. printStackTrace ( ) ; } } } } } private void handleVoucherOrder ( VoucherOrder voucherOrder
) { Long userId
= voucherOrder
. getUserId ( ) ; RLock lock
= redissonClient
. getLock ( "lock:order:" + userId
) ; boolean isLock
= lock
. tryLock ( ) ; if ( ! isLock
) { log
. error ( "不允許重復下單" ) ; return ; } try { proxy
. createVoucherOrder ( voucherOrder
) ; } finally { lock
. unlock ( ) ; } } @Override public Result seckillVoucher ( Long voucherId
) { Long userId
= UserHolder . getUser ( ) . getId ( ) ; Long orderId
= redisIdWorker
. nextId ( "orderId" ) ; Long result
= stringRedisTemplate
. execute ( SECKILL_SCRIPT , Collections . emptyList ( ) , voucherId
. toString ( ) , userId
. toString ( ) , String . valueOf ( orderId
) ) ; int r
= Objects . requireNonNull ( result
) . intValue ( ) ; if ( r
!= 0 ) { return Result . fail ( r
== 1 ? "庫存不足" : "不能重復下單" ) ; } VoucherOrder voucherOrder
= VoucherOrder . builder ( ) . id ( orderId
) . userId ( userId
) . voucherId ( voucherId
) . build ( ) ; orderTasks
. add ( voucherOrder
) ; proxy
= ( IVoucherOrderService ) AopContext . currentProxy ( ) ; return Result . ok ( orderId
) ; }
總結
以上是生活随笔 為你收集整理的聊聊Redis消息队列-实现异步秒杀 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。