Redis03-优惠券秒杀
一、分布式Id:訂單id
在分布式架構下,傳統(tǒng)生成Id的方式已經(jīng)不再適用,應該生成全局唯一的
分布式Id需要滿足的五個特性:全局性、唯一性、安全性、可用性、高性能
@Component public class RedisIdWorker {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private final long COUNT_BITS = 32L;private final long BEGIN = 1652197856L;public long nextId(String keyPrefix){//1.獲取當前時間戳與項目運行時間戳的差值long timeFiled = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN;//2.生成序列化號String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);return timeFiled << COUNT_BITS | count;} }每個業(yè)務每天都重新開始計數(shù),求出當前時間與項目運行時間戳的差值,將其左移32位在對count進行 | 操作后轉化為十進制
該方法的思路:一個long類型有8個字節(jié),用四個字節(jié)(32位)存放時間信息(136.102208 年),四個字節(jié)存放這一天的訂單數(shù)量。那么這種方法從項目開始計時,能夠正確運行2^32 秒,每天最多能生成2^32個id。
其實我們也可以只用16位來存儲當前count,剩下48位來存放時間信息,如果一天內達不到2^32次方的訂單id生成量,不如節(jié)省下來位數(shù)存儲時間信息讓項目運行的更久。
二、優(yōu)惠券下單
我們先來看如下的流程:
這個流程咋一看好像沒有問題,但實際上它涉及到一個并發(fā)問題:
庫存是一個公共資源,線程對它就行讀-寫操作時如果不加鎖則會出現(xiàn)超賣問題
這個問題如何解決呢?當然是通過加鎖來解決,鎖大致分為樂觀鎖和悲觀鎖,我們來探討兩者的差異
樂觀鎖
樂觀鎖的思想也適用于分布式項目
對于優(yōu)惠券秒殺這種寫多讀少的業(yè)務,推薦使用悲觀鎖,樂觀鎖成功率低反而更加消耗性能
一人一單
業(yè)務流程
并發(fā)問題
使用jdk提供的鎖只在它所在的jvm有效,無法鎖住其它jvm,由于該項目是一個分布式項目,我們需要采取分布式鎖解決該問題
分布式鎖
實現(xiàn)方式
MySQL實現(xiàn)方式:通過行鎖或表鎖的機制達到互斥性,但是它把鎖的壓力給到了數(shù)據(jù)庫,而數(shù)據(jù)庫又是相當脆弱的部分無法面對高并發(fā)場景,所以使用MySQL實現(xiàn)互斥鎖的業(yè)務并發(fā)量不能太大。具體看這篇博客:https://cloud.tencent.com/developer/article/1580632
Redis實現(xiàn)分布式鎖
上面這一套流程存在一個問題,它是由鎖標識一致所導致的,當線程一因為業(yè)務超時導致鎖過期時,線程二獲取到了鎖,等到線程一執(zhí)行完后釋放掉的鎖此時已經(jīng)是線程二的了,線程三看鎖沒了又去獲取鎖…
所以設置一個鎖標識是非常重要的,而并發(fā)是以線程為單位的,每個線程都有一個自己的鎖標識就ok啦,那拿什么給每個線程充當鎖標識呢?UUID可以嘛?理論上可以,但如果使用uuid的話,每上一次鎖就要生成一個uuid,是非常消耗性能的操作,有沒有更優(yōu)化的操作呢?
其實每個線程都有一個天然的鎖標識,那就是線程id,但是不同jvm的線程id可能重復,我們可以將uuid設置為靜態(tài)變量作為鎖標識的前綴,用線程id作為鎖標識的后綴,這樣不同機器的uuid是不同的,可以保證一致性,同時uuid作為靜態(tài)變量每臺機器只需要創(chuàng)建一次,不需要每次上鎖都創(chuàng)建,提高了性能
Redis實現(xiàn)簡單的分布式鎖
public class SimpleRedisLock {private String key;private StringRedisTemplate stringRedisTemplate;private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {this.key = key;this.stringRedisTemplate = stringRedisTemplate;}public boolean tryLock(long timeSeconds){String id = ID_PREFIX + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, id, timeSeconds, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}public void unlock(){String val = stringRedisTemplate.opsForValue().get(key);if(val != null && val.equals(ID_PREFIX + Thread.currentThread().getId())){stringRedisTemplate.delete(key);}} }其實上述代碼還存在一點小問題,那就是unlock操作,由于它不是原子性操作,那么當它從redis的鎖中取值后,redis鎖過期了,那么雖然它可以執(zhí)行if內的語句,但此時它的鎖已經(jīng)過期了,它有可能會刪除其它線程的鎖
解決辦法也很簡單,就是想法讓unlock中的操作變成原子操作,一說這個很多人第一反應是加鎖,但jvm和redis是兩個獨立運行的進程,沒法加鎖,我們可以通過讓redis執(zhí)行l(wèi)ua腳本來保證操作的原子性
unlock.lua
setnx實現(xiàn)分布式鎖存在的問題
Redission提供的分布式鎖
自動續(xù)約
我們回憶一下,之前使用setnx充當分布式鎖的時候為什么要設置過期時間呢?是防止由于redis宕機導致這個鎖無法釋放成為死鎖,那么當cpu資源緊張且某個業(yè)務執(zhí)行時間比較長的時候,很可能鎖就被自動釋放導致其它線程也進入到該臨界區(qū),存在并發(fā)安全問題,那么Redission的出現(xiàn)完美的解決了該問題!
Redission內置了WatchDog機制,默認每隔十秒檢查一遍鎖,若存在給它重新設置30秒過期時間,如果redis宕機了,那么WatchDog會停止續(xù)約操作,鎖會在30秒后自動釋放
WatchDog機制只有在未設置過期時間時才有效,也就是未設置leaseTime才生效
可重入
可重入機制是利用hashmap的特性,使用field標識鎖標識,val標識鎖的重入次數(shù),釋放鎖時將val-1,若val=0,則刪除key
主從一致性
當我們的項目使用了redis集群時,當我們對主節(jié)點執(zhí)行setnx進行上鎖的時候,從節(jié)點還未來得及同步主節(jié)點數(shù)據(jù)時,主節(jié)點宕機導致鎖丟失,我們可以采用redission提供的multilock的機制來解決該問題
鎖丟失
-
那么 Redisson 是如何解決上述問題的呢?既然導致主從一致性問題發(fā)生的主要原因是主從同步延時問題,Redisson 干脆直接舍棄了主從節(jié)點,所有 Redis 節(jié)點都是獨立的節(jié)點,相互之間無任何關系,都可以做讀寫操作。此時,我們想獲取鎖就必須依次向多個 Redis 都去獲取鎖(之前直接向 Master 節(jié)點獲取就可以),多個 Redis 節(jié)點都保存鎖的標識,才算獲取成功
-
這樣一來,由于所有節(jié)點都是獨立的,所以避免了主從一致性問題;又由于所有的節(jié)點都保存了鎖標識,即使由一個節(jié)點宕機,其他的節(jié)點也保存有鎖的標識,保證了高可用,并且可用性會隨著節(jié)點的增多而增高
-
此外,我們還以為給這些獨立的節(jié)點再加上從節(jié)點 Slave,即使一個獨立節(jié)點宕機了導致其對應的從節(jié)點變成新的主節(jié)點,且節(jié)點上鎖標識丟失了也沒有關系,因為我們只有在每一個節(jié)點都拿到鎖才算成功, 盡管可以在這個空虛的節(jié)點上獲取到鎖,但在其他節(jié)點上是獲取不到的,最終仍然是失敗,因此只要有任意一個節(jié)點存貨,其他線程就不可能拿到鎖,就不會出現(xiàn)鎖失效問題。這樣,既保留了主從同步機制,又確保了 Redis 集群的高可用特性,同時還避免了主從一致所引發(fā)的鎖失效問題,這個方案就叫做 mutilLock
簡而言之,就是對多個redis節(jié)點進行上鎖,必須全部上鎖成功才算成功,哪怕有一個節(jié)點的鎖沒有釋放當前線程都無法獲得鎖
秒殺優(yōu)化
優(yōu)化前
單線程處理數(shù)據(jù)的校驗以及訂單的創(chuàng)建,而從判斷秒殺庫存->訂單創(chuàng)建這個過程中我們需要采用加鎖的機制來保證不發(fā)生超賣問題和一人一單的正確性,這樣做雖然可行,但卻無法面對高并發(fā)場景,因為加鎖的過程太長,并且加鎖范圍內有好幾個隊數(shù)據(jù)庫的操作,業(yè)務太重,影響用戶體驗
優(yōu)化后
將數(shù)據(jù)校驗的業(yè)務用redis處理,處理成功后返回給客戶端,而對數(shù)據(jù)庫的操作額外使用消費者線程在后臺中處理,既能保證數(shù)據(jù)一致性,又大大加快響應速度。
數(shù)據(jù)校驗
接口定義:
@PostMapping(“seckill/{id}”)
public Result seckillVoucher(@PathVariable(“id”) Long voucherId)
業(yè)務流程
注意:上述流程涉及到并發(fā)安全問題,在看下面代碼前可以思考一下加鎖的位置、對誰上鎖以及鎖的釋放
代碼如下
public Result secKillVoucher2(long id) {//1.設置keyString stock_key = RedisConstants.SECKILL_STOCK_KEY + id;String order_key = "seckill:order:" + id;long order_id = idWorker.nextId("order");//1.1判斷redis中是否有存放stock的keyif(StrUtil.isBlank(stringRedisTemplate.opsForValue().get(stock_key))) {//1.2 若不存在該key則刷新緩存Integer stock = cacheClient.queryDataByMutex(RedisConstants.SECKILL_STOCK_KEY, id, Integer.class, (v_id) -> {SeckillVoucher voucher = seckillVoucherService.getById(v_id);return voucher.getStock();}, 10, TimeUnit.MINUTES);}//2.判斷該用戶id是否存在set中Long userId = UserHolder.getUser().getId();RLock lock = redissonClient.getLock("lock:user:" + userId);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("請無重復下單");}try {Boolean flag = stringRedisTemplate.opsForSet().isMember(order_key, userId.toString());if (flag) {return Result.fail("請無重復下單");}long add = stringRedisTemplate.opsForSet().add(order_key, userId.toString());//3.校驗庫存是否夠Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get(stock_key));if (stock > 0) {stringRedisTemplate.opsForValue().decrement(stock_key);} else {return Result.fail("庫存不足");}//4.封裝好訂單信息發(fā)送到消息隊列HashMap<String, String> map = new HashMap<>();map.put("id", String.valueOf(order_id));map.put("userId", String.valueOf(userId));map.put("voucherId", String.valueOf(id));stringRedisTemplate.opsForStream().add("stream.orders", map);return Result.ok(order_id);}catch (Exception e){log.error(e.getMessage());return Result.fail(e.getMessage());}finally {System.out.println("seckill unlock");if(lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}}上述代碼大家覺得有問題嗎,我對它做過壓測,同一用戶的請求一秒1000的并發(fā)量是沒有問題的,而不同用戶訪問時,會出現(xiàn)超賣問題,因為上鎖的范圍是針對UserId的,不同用戶之間互不干擾,我們可以通過更改鎖的對象,使用voucherId作為鎖,但這樣會大大降低并發(fā)量,接下來我將介紹一種更好的解決方案:lua腳本
lua腳本
Lua也算一門古老的語言了,玩魔獸世界的玩家應該對它不陌生,WOW的插件就是用Lua腳本編寫的。在高并發(fā)的網(wǎng)絡游戲中Lua大放異彩被廣泛使用。
Lua廣泛作為其它語言的嵌入腳本,尤其是C/C++,語法簡單,小巧,源碼一共才200多K,這可能也是Redis官方選擇它的原因。
為什么使用lua解決并發(fā)問題:因為在lua中的操作是原子性的,redis一旦執(zhí)行某個lua腳本,在執(zhí)行完成之前是不會執(zhí)行其它請求的
那么我們可以將上述代碼的鎖范圍內的業(yè)務邏輯寫在lua腳本中,讓lua腳本來保證它們的串行執(zhí)行
代碼
@Overridepublic Result secKillVoucher(long id) {//1.設置keyString stock_key = RedisConstants.SECKILL_STOCK_KEY + id;String order_key = "seckill:order:" + id;long order_id = idWorker.nextId("order");//1.1判斷redis中是否有存放stock的keyif(StrUtil.isBlank(stringRedisTemplate.opsForValue().get(stock_key))) {//1.2 若不存在該key則刷新緩存Integer stock = cacheClient.queryDataByMutex(RedisConstants.SECKILL_STOCK_KEY, id, Integer.class, (v_id) -> {SeckillVoucher voucher = seckillVoucherService.getById(v_id);return voucher.getStock();}, 10, TimeUnit.MINUTES);}//2.執(zhí)行l(wèi)ua腳本,判斷是否符合條件,若符合條件則發(fā)送到消息隊列Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Arrays.asList(order_key, stock_key), UserHolder.getUser().getId().toString(),String.valueOf(order_id),String.valueOf(id));//3. 條件校驗失敗if(res.intValue() != 0){return Result.fail("下單失敗,請勿重復下單");}return Result.ok(order_id);}lua腳本
--獲取key local order_key = KEYS[1] local order_stock_key = KEYS[2]--獲取用id local user_id = ARGV[1] -- 獲取訂單id local order_id = ARGV[2] -- 獲取優(yōu)惠券id local voucher_id = ARGV[3]-- 判斷庫存是否足夠 if(tonumber(redis.call("get",order_stock_key)) <= 0) thenreturn 1 end-- 判斷用戶id是否存在該商品的用戶列表中 if( redis.call("sismember",order_key,user_id) == 1) thenreturn 2 end-- 庫存-1 redis.call("incrby",order_stock_key,-1) -- 將userId添加進該商品的用戶列表 redis.call("sadd",order_key,user_id) -- 向消息隊列發(fā)送消息 redis.call("xadd","stream.orders","*","id",order_id,"userId",user_id,"voucherId",voucher_id) return 0消息隊列:Stream
在redis中,有個Stream類型的數(shù)據(jù),可以說是為了消息隊列而生的,若是項目不太大但又需要使用消息隊列,我們可以使用redis的Stream類型來充當消息隊列,它的優(yōu)點是配置簡單,不會額外增加運維成本、使用方便
基本的使用語法我已經(jīng)寫在了另一篇博客中:https://blog.csdn.net/qq_42861526/article/details/124753721
接下來我主要給大家講一下Stream的特點
消費者組
Stream和消費者組通常是一起出現(xiàn)的,我們可以為Stream創(chuàng)建一個或多個消費者組,每個消費者組包含一個或多個消費者,消費者組之間共享消息,同一個消費者組下的消費者競爭消息
特點
- 消息分流:隊列中的消息會分流給消費者組中不同的消費者,不會讓他們重復消費,提高消息處理速度
- 消息標示:每個消費者組會維護一個標示,記錄它最后處理過的消息,哪怕它宕機后重啟,也能從標示之后開始消費
- 消息確認:消費者獲取消費后,消息會變成pending狀態(tài)并添加到pending-list中,當消費者對該消息執(zhí)行XACK后,該消息才會重pending-list中移除
讀取、解析消息
private class VoucherHandler implements Runnable{@Overridepublic void run() {try {while(true) {//1.從消息隊列中取出訂單List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));//2.判斷消息是否為空if(list == null || list.isEmpty()){continue;}//3.解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> map = record.getValue();VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);//4.創(chuàng)建訂單createVoucherOrder(order);stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",record.getId());}} catch (Exception e) {log.error("處理訂單異常");log.error(e.getMessage());handlePendingMsg();}}}private void handlePendingMsg(){try {while(true) {//1.從消息隊列中取出訂單List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.from("0")));//2.判斷消息是否為空if(list == null || list.isEmpty()){break;}//3.解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> map = record.getValue();VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);//4.創(chuàng)建訂單createVoucherOrder(order);stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",record.getId());}} catch (Exception e) {try {Thread.sleep(1000);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}log.error("處理訂單異常");}}消費消息
@Transactionalpublic void createVoucherOrder(VoucherOrder order) {log.debug("創(chuàng)建訂單......");//1.userid設置鎖RLock lock = redissonClient.getLock("lock:order:user:" + order.getUserId());boolean tryLock = lock.tryLock();if(!tryLock){log.debug("請勿重復下單");return ;}try{//2. 查看該用戶是否搶過該優(yōu)惠券Integer count = query().eq("user_id", order.getUserId()).eq("voucher_id", order.getVoucherId()).count();if (count > 0) {log.debug("請勿重復下單");return ;}//3.扣減庫存System.out.println("扣減庫存");boolean success = seckillVoucherService.update().setSql("stock = stock-1").eq("voucher_id", order.getVoucherId()).gt("stock", 0).update();//3.1庫存不足if(!success){log.debug("庫存不足");return;}//4.保存訂單到數(shù)據(jù)庫save(order);}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}問題:我們已經(jīng)在發(fā)送消息時保證了并發(fā)安全,為什么在處理消息時還采用加鎖和數(shù)據(jù)校驗?
答:因為在redis主從同步的集群下,我們判斷我們的用戶id是否存在voucherId對應set中的操作是一個讀操作,它會去從節(jié)點讀取,若主節(jié)點已經(jīng)添加了這個userId,而從節(jié)點還沒來得及同步消息,那么代碼會繼續(xù)往下執(zhí)行,將同樣的消息發(fā)送到消息隊列中
但在我們這個項目中,由于使用lua腳本來保證執(zhí)行的原子性,即使在主從集群下,lua腳本首先會發(fā)給主節(jié)點,主節(jié)點再將腳本分發(fā)給從節(jié)點一起執(zhí)行,所以主從的所有節(jié)點一次只能執(zhí)行一個lua腳本請求,不會出現(xiàn)上面所說的情況,我們添加鎖和數(shù)據(jù)校驗只是為了增強程序的健壯性,因為執(zhí)行消費消息的線程是后臺執(zhí)行的,它并不要求響應速度,所以額外增加一點業(yè)務也無傷大雅
總結
以上是生活随笔為你收集整理的Redis03-优惠券秒杀的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL-day02作业
- 下一篇: 高斯混合模型聚类算法和K-Means聚类