用 Redis 实现分布式锁(Java 版)
用 Redis 實現分布式鎖(Java 版)
- 核心代碼
- 完整代碼
??分布式鎖是一種解決分布式臨界資源并發讀寫的一種技術。本文詳細介紹了在 Java 中使用 Redis 實現分布式鎖的方法。為了方便,這里使用了 Spring Boot 中的 RedisTemplate 來與 Redis 進行交互。本文的分布式鎖有如下功能:
-
是分布式鎖,互斥粒度為線程級。
-
可重入。同一線程可多次上鎖。
-
鎖不唯一。可以持有多個名稱不同的鎖,不同名的鎖之間的創建與釋放互相獨立。
-
支持鎖過期自動釋放。
-
支持持鎖超時自動異步續時。
【漸進式問答】【Q & A】
Q:Redis 實現分布式鎖的原理是什么?
A:鎖本質上是一種邏輯控制,使用一個布爾型的變量就可以。比方說,可以讓 Redis 中的某個鍵存在表示上了某種鎖,當 Redis 中沒有這個鍵時表示沒有上這個鎖。
而 Redis 是獨立于用戶程序的一種擁有集群功能的全局分布式應用,因此可以用于實現分布式鎖。
Q:如何實現 Redis 分布式鎖的線程級可重入。
A:可以使用 ThreadLocal 記錄每個線程當前上鎖的重入次數。每當上鎖時,就將記錄中的重入次數加 1。每當釋放鎖時,就將其減 1。特別地,在釋放鎖時,如果重入次數為 1,就真正地在 Redis 中刪除此鎖。
Q:對于這種情況如何應對:一個程序在設置了 Redis 分布式鎖之后,然后業務代碼中拋出了異常,結果程序跳過了后面的釋放鎖代碼就退出了。
A:可以將加分布式鎖的代碼置于一個 try 塊 中,然后在 try 塊 后面加不含 catch 塊 的 finally 子句,并在 finally 子句 中編寫釋放鎖的代碼。這樣,無論中途發生了什么異常,釋放鎖的代碼一定會執行。
Q:在問題【3】中,如果一個程序在沒有獲得鎖的情況下就退出,這不就可能會釋放正在持有鎖的程序的鎖嗎?
A:對于這種情況可以借助 ThreadLocal,用兩種方法來應對:
-
使用 ThreadLocal 為每個線程生成一個 ID,然后將此 ID 存于 Redis 鎖中,等釋放鎖之時,檢查鎖中的 ID 與本線程的 ID 是否一致。如果一致才真正釋放鎖。
-
利用本 Redis 鎖的互斥性。使用 ThreadLocal 記錄每個線程當前上鎖的重入次數。因為本 Redis 鎖是互斥鎖,所以只可能有一個線程,它的當前上鎖次數大于 0。因此,釋放鎖的時候只需要判斷自己當前的上鎖次數是否為 0 即可。如果不為 0,才真正釋放鎖。
本文使用的是這種方法。
Q:對于這種情況如何應對:一個程序在設置了 Redis 分布式鎖之后,還沒來得及釋放該鎖就崩潰了。此時,所有的程序都無法獲取受該鎖束縛的資源。
A:可以選擇在上鎖的同時引入超時時間。此時如果問題中的程序崩潰時,鎖會自動釋放。
Q:在問題【5】中,如果該程序在上鎖之后還沒有來得及設置超過時間就崩潰呢?
A:可以讓上鎖和設置超過時間這兩個操作變成同一個原子操作。
現在,Spring Boot 中的 RedisTemplate 有這種 API 可以實現這一點。
如果有的技術沒有提供這種 API,可以使用 Redis 中的 Eval 命令,這個命令支持運行一段 Lua 腳本,這個命令是原子性的。
【錯誤的解決方案】
-
Q1:在問題【5】中,如果該程序在上鎖之后還沒有來得及設置超過時間就崩潰呢?
-
A1:可以將本次上鎖時間作為 Redis 鎖的值存入,同時規定某個鍵存在表示上了某種鎖,沒有這個鍵時表示沒有上這個鎖。然后令讀取鎖的程序通過比較上鎖時間與當前時間來判斷此鎖有沒有過期。
-
Q2:如果鎖過期了,如何保證只有一個程序可以獲得鎖?
-
A2:可以使用類似于樂觀鎖的機制,在上鎖時同時將上鎖應用的 ID 存入,然后在加鎖之后再讀取鎖數據,判斷最后加鎖成功的是不是自己即可。
-
Q3:要怎么做到對“最后加鎖”的判斷?如何解決這種情況:兩個程序都要加鎖,而第一個程序執行很快,加鎖之后又認為自己成功加上了鎖。然后第二個執行較慢的程序將鎖覆蓋,也認為自己成功加上了鎖。現在,兩個程序都認為自己加上了鎖。
-
A3:這確實是錯誤的解決方案。
Q:在問題【5】中,如果該程序在上鎖后業務代碼執行時間過長而鎖超時怎么辦?
A:可以在加鎖之后開啟一個子線程進行異步周期性地續時。當釋放鎖時,再中斷結束這個續時線程。
Q:在問題【7】中,每次上鎖都開啟新線程,這個開銷是不是有點大了?
A:那可以選擇讓同一個名稱的鎖對應同一個續時線程。具體來說,事先開啟一個續時線程,這個續時線程不會因鎖釋放而銷毀。然后讓這個續時線程完成所有線程上鎖的續時任務。
Q:在問題【8】中,如果程序需要使用 1w 個鎖來鎖 1w 條不同的數據,那這樣在后臺開啟 1w 個續時線程是不是容易溢出?
A:可以在創建續時線程時設置續時線程的個數上限。如果達到上限,可以采取很多策略,比如令新的續時線程像問題【7】一樣在鎖釋放時銷毀。
Q:問一個與創建 Redis 分布式鎖無關的問題。對于秒殺的業務,假設購買商品前要加鎖,如果沒有拿到鎖,會自旋等待。現在如果有 1w 個購買請求,但商品數只有 100 個,這就意味著理論上在秒殺結束之后,有 9900 個請求是不需要拿到鎖的。如何保證這一點?如何防止這樣的一種情況:明明秒殺已經結束了,剩下的 9900 個請求仍然在自旋排隊拿鎖,并在拿到鎖之后執行業務代碼。
A:如果這個秒殺項目使用了一種高速緩存技術,可以選擇在秒殺結束之后,將秒殺結束這一信號存于高速緩存中。當請求在自旋等待時,不斷在高速緩存中查詢秒殺是否結束,如果是就結束自旋。同時在拿到鎖之后,也要查詢秒殺是否結束,如果是就跳過某些業務代碼。
Q:在問題【10】中,為什么在拿到鎖之后,也要查詢秒殺是否結束?
A:在線程在自旋等待過程中,其可能會位于自旋等待過程中的任何一個時間點。如果有大量的線程位于拿鎖的時間點,那么當其它其它線程釋放鎖時,即便是秒殺結束了,自旋等待中判斷秒殺是否結束的代碼也不會起作用。因為當它拿到鎖的時候,就會馬上退出循環,而不會經歷這個自旋中的判斷代碼。因此在拿到鎖之后,也要執行這個判斷代碼。
【編程難點】(這些問題的答案不方便文字描述,這里從略。讀者可以在文末筆者的源代碼中找到解決方案)
在規定一個分布式鎖對應一個續時線程的情況下,如果需要使用多個鎖,如何避免多線程并發時,為每一個鎖創建了多個續時線程?
如何在多線程共用同一續時線程的情況下,控制此續時線程的續時停止與恢復?
如何保證在得到和釋放分布式鎖時,續時線程能立刻感知到?(如果續時線程剛好在休眠,那它就不能立刻感知到)
如何防止續時線程意外中止?
核心代碼
package org.wangpai.demo.lock;import java.util.concurrent.TimeUnit; import lombok.Setter; import org.springframework.data.redis.core.RedisTemplate;/*** 分布式可重入鎖** @since 2022-3-13*/ public class DistributedReentrantLock {@Setterprivate static RedisTemplate<String, String> redisTemplate;private final String name;/*** 線程級可重入** @since 2022-3-13*/private final ThreadLocal<Integer> lockedTimes = new ThreadLocal<>();@Setterprivate int lockedDuration = 10;private TimeUnit lockedDurationUnit = TimeUnit.SECONDS;public DistributedReentrantLock(String name) {this.name = name;this.lockedTimes.set(0);}/*** 嘗試加鎖,如果失敗,返回 false** @since 2022-3-13*/public boolean tryLock(long timeout, TimeUnit unit) {var times = this.lockedTimes.get();boolean isSuccessful = true;if (times == 0) {isSuccessful = redisTemplate.opsForValue().setIfAbsent(this.name, this.name, timeout, unit);}if (isSuccessful) {this.lockedTimes.set(times + 1);var renewal = DistributedLockFactory.getRenewal(this.name);renewal.setTimeRenewal(this.lockedDuration).setTimeWaiting(this.lockedDuration / 2).resume();}return isSuccessful;}/*** 嘗試最多持續 60s 的鎖** @since 2022-3-13*/public boolean tryLock() {return this.tryLock(this.lockedDuration, this.lockedDurationUnit);}/*** 嘗試加鎖,如果失敗,返回 false** @since 2022-3-13*/public boolean tryLock(long timeout) {return this.tryLock(timeout, this.lockedDurationUnit);}/*** 只有本線程上過鎖時,調用此方法才有效** @since 2022-3-13*/public void unlock() {var times = this.lockedTimes.get();if (times == 0) {System.out.println("本線程沒有上過鎖,解鎖失敗");return;}// 本線程是否上過鎖if (times == 1) {/*** 因為這個鎖是互斥鎖,所以只要本線程加鎖過,其它線程不可能可以加鎖,* 因此這鎖一定是本線程加的,故無需驗證線程 id*/redisTemplate.delete(this.name);var renewal = DistributedLockFactory.getRenewal(this.name);renewal.suspend();System.out.println("完全釋放分布式鎖");}this.lockedTimes.set(times - 1);} } package org.wangpai.demo.lock;import java.util.concurrent.ConcurrentHashMap; import lombok.Setter; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;/*** @since 2022-3-13*/ @Component public class DistributedLockFactory {private static ConcurrentHashMap<String, LockRenewal> asynchronousRenewals = new ConcurrentHashMap<>();private static LockTypeRegister lockTypeRegister;@Setterprivate static volatile int threadLimit = 100;public DistributedLockFactory(RedisTemplate<String, String> redisTemplate, LockTypeRegister register) {DistributedReentrantLock.setRedisTemplate(redisTemplate);LockRenewal.setRedisTemplate(redisTemplate);lockTypeRegister = register;}public static DistributedReentrantLock getDistributedLock(LockType lockType, String originKey) {var lockKey = LockTypeUtil.keyCompound(lockType, originKey);// 雙重檢查鎖定:第一重判斷if (!asynchronousRenewals.containsKey(lockKey)) {var lock = lockTypeRegister.getRegister().get(lockType);try {lock.lock(); // 對 lockType 上鎖// 雙重檢查鎖定:第二重判斷if (!asynchronousRenewals.containsKey(lockKey)) {var timeRenewal = new LockRenewal();timeRenewal.setLockKey(lockKey).setStarted(true);// 當總線程數達到上限時,設置 timeRenewal 快速銷毀if (asynchronousRenewals.entrySet().size() >= threadLimit) {timeRenewal.setFastClosed(true);}asynchronousRenewals.put(lockKey, timeRenewal);var renewalThead = new Thread(timeRenewal);timeRenewal.setRunningThread(renewalThead);renewalThead.start();}} finally {lock.unlock();}}return new DistributedReentrantLock(lockKey);}public static LockRenewal getRenewal(String name) {return asynchronousRenewals.get(name);} } package org.wangpai.demo.lock;import java.util.concurrent.TimeUnit; import lombok.Setter; import lombok.experimental.Accessors; import org.springframework.data.redis.core.RedisTemplate;/*** 為了避免反復新建線程的開銷,此類會事先就后臺運行,然后供所有的線程共用** @since 2022-3-19*/ @Accessors(chain = true) public class LockRenewal implements Runnable {@Setterprivate static RedisTemplate<String, String> redisTemplate;@Setterprivate Thread runningThread;/*** 鎖的名稱** @since 2022-3-19*/@Setterprivate volatile String lockKey;/*** 控制線程的啟動與終止** @since 2022-3-19*/@Setterprivate volatile boolean started = false;/*** 控制續時任務的暫停與恢復** @since 2022-3-19*/private volatile boolean isRunning = false;/*** 當系統的總線程數過高時,將此字段置位。此時當 isSuspended 也為 true 時,銷毀本線程,而不是靜默執行空任務** @since 2022-3-19*/@Setterprivate volatile boolean fastClosed = false;/*** 控制續時任務執行間隔時間,單位:秒** 注意:timeWaiting 值不能大于 timeRenewal 值。建議 timeWaiting 為 timeRenewal 的 1/3。* timeWaiting 與 timeRenewal 過于接近容易導致碰巧因啟動時間差,而使續時任務正處于休眠狀態而沒有及時續時** @since 2022-3-19*/@Setterprivate volatile long timeWaiting = 20;/*** 控制續時時長,單位:秒。** @since 2022-3-19*/@Setterprivate volatile long timeRenewal = 60;private int count = 0;@Overridepublic void run() {System.out.println("續時線程啟動");while (this.started) {try {if (this.fastClosed && !this.isRunning) {return;}// 第一步應該先休眠,而不應該馬上續時try {Thread.sleep(this.timeWaiting * 1000);} catch (InterruptedException interruptedException) {// 續時任務被外部中斷時,線程不退出this.afterInterrupt();continue; // 中斷后應該重新開始}this.count++;if (this.isRunning) {this.renewDistributedLock();}} catch (Throwable throwable) {// 此 catch 塊是為了避免中途某代碼引發異常而導致此線程意外中止throwable.printStackTrace();}}System.out.println("續時線程終止");}/*** 此方法必須中斷續時任務的休眠** @since 2022-3-19*/public void resume() {this.isRunning = true;this.count = 0;this.runningThread.interrupt();}/*** 此方法必須中斷續時任務的休眠** @since 2022-3-19*/public void suspend() {this.isRunning = false;this.count = 0;this.runningThread.interrupt();}private void afterInterrupt() {this.runningThread.isInterrupted(); // 清除中斷標志System.out.println("續時任務休眠中斷,計數重置");}private void renewDistributedLock() {redisTemplate.expire(this.lockKey, this.timeRenewal, TimeUnit.SECONDS);System.out.println("第" + this.count + "次續時成功");} } package org.wangpai.demo.service;import org.springframework.boot.autoconfigure.cache.CacheType; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.wangpai.demo.lock.DistributedLockFactory; import org.wangpai.demo.lock.LockType;/*** @since 2022-3-20*/ @Service public class DemoService {@Transactionalpublic DemoService demoService() {final var spinTime = 1; // 自旋時間,單位:秒var someKey="someKey";var lock = DistributedLockFactory.getDistributedLock(LockType.LOCK_1, someKey);try {int count = 0;// 獲取分布式鎖while (!lock.tryLock()) {try {Thread.sleep(spinTime * 1000);} catch (InterruptedException exception) {exception.printStackTrace();}// TODO:判斷現在是否已經不需要得到鎖了。如果是,退出此自旋System.out.println("第" + (++count) + "次沒有拿到鎖,嘗試下一次");}System.out.println("得到分布式鎖");// TODO:判斷現在是否已經不需要得到鎖了。如果是,直接放棄鎖System.out.println("得到分布式鎖,但可能已經不需要了"); // TODO:需要將此日志更正為更具體的日志信息// TODO:業務代碼} finally {System.out.println("嘗試釋放分布式鎖");// 無論前面是否拋出異常,此處都要釋放鎖。這不會釋放別人的鎖lock.unlock();}// TODO:不需要上鎖的業務代碼return this;}}完整代碼
??已上傳至 GitCode 中,可免費下載:https://gitcode.net/wangpaiblog/20220321-distributedlock-redis
總結
以上是生活随笔為你收集整理的用 Redis 实现分布式锁(Java 版)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 极简高并发秒杀商城
- 下一篇: 同步阻塞、同步非阻塞、异步阻塞、异步非阻