一文掌握Redisson分布式锁原理|干货推荐
ReentrantLock 重入鎖
在說 Redisson 之前我們先來說一下 JDK 可重入鎖: ReentrantLock
ReentrantLock 保證了 JVM 共享資源同一時刻只允許單個線程進(jìn)行操作
實現(xiàn)思路
ReentrantLock 內(nèi)部公平鎖與非公平鎖繼承了 AQS[AbstractQueuedSynchronizer]
1、AQS 內(nèi)部通過 volatil 修飾的 int 類型變量 state 控制并發(fā)情況下線程安全問題及鎖重入
2、將未競爭到鎖的線程放入 AQS 的隊列中通過 LockSupport#park、unPark 掛起喚醒
簡要描述哈, 詳情可以查看具體的文章
Redisson
可以直接查看 Github Redisson官網(wǎng) 介紹, 沒有了解過的小伙伴, 看一下 Redisson 的 WIKI 目錄, 仔細(xì)瞅瞅 Redis 是如何被 Redisson 武裝到牙齒的
上下滾動查看更多
這里先過一下和文章有關(guān)的一部分內(nèi)容
通過項目簡介可以看出來, 寫這個項目介紹的人水平非常哇塞哈, 從第一段咱們就知道了兩個問題
Redisson 是什么
Redisson 是架設(shè)在 Redis 基礎(chǔ)上的一個 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格框架, 充分利用 Redis 鍵值數(shù)據(jù)庫提供的一系列優(yōu)勢, 基于 Java 實用工具包中常用接口, 為使用者提供了 一系列具有分布式特性的常用工具類
Redisson 的優(yōu)勢
使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包 獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力, 大大降低了設(shè)計和研發(fā)大規(guī)模分布式系統(tǒng)的難度
同時結(jié)合各富特色的分布式服務(wù), 更進(jìn)一步 簡化了分布式環(huán)境中程序相互之間的協(xié)作
“了解到這里就差不多了, 就不向下擴(kuò)展了, 想要了解詳細(xì)用途的, 翻一下上面的目錄
Redisson 重入鎖
由于 Redisson 太過于復(fù)雜, 設(shè)計的 API 調(diào)用大多用 Netty 相關(guān), 所以這里只對 如何加鎖、如何實現(xiàn)重入鎖進(jìn)行分析以及如何鎖續(xù)時進(jìn)行分析
創(chuàng)建鎖
我這里是將 Redisson 的源碼下載到本地了
下面這個簡單的程序, 就是使用 Redisson 創(chuàng)建了一個非公平的可重入鎖
lock() 方法加鎖成功 默認(rèn)過期時間 30 秒, 并且支持 "看門狗" 續(xù)時功能
public?static?void?main(String[]?args)?{Config?config?=?new?Config();config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6379");RedissonClient?redisson?=?Redisson.create(config);RLock?lock?=?redisson.getLock("myLock");try?{lock.lock();//?業(yè)務(wù)邏輯}?finally?{lock.unlock();} }我們先來看一下 RLock 接口的聲明
public?interface?RLock?extends?Lock,?RLockAsync?{}RLock 繼承了 JDK 源碼 JUC 包下的 Lock 接口, 同時也繼承了 RLockAsync
RLockAsync 從字面意思看是 支持異步的鎖, 證明獲取鎖時可以異步獲取
“看了 Redisson 的源碼會知道, 注釋比黃金貴 ?????
由于獲取鎖的 API 較多, 我們這里以 lock() 做源碼講解, 看接口定義相當(dāng)簡單
/***?lock?并沒有指定鎖過期時間,?默認(rèn)?30?秒*?如果獲取到鎖,?會對鎖進(jìn)行續(xù)時*/ void?lock();獲取鎖實例
根據(jù)上面的小 Demo, 看下第一步獲取鎖是如何做的
RLock?lock?=?redisson.getLock("myLock");// name 就是鎖名稱 public?RLock?getLock(String?name)?{//?默認(rèn)創(chuàng)建的同步執(zhí)行器,?(存在異步執(zhí)行器,?因為鎖的獲取和釋放是有強(qiáng)一致性要求,?默認(rèn)同步)return?new?RedissonLock(connectionManager.getCommandExecutor(),?name); }Redisson 中所有 Redis 命令都是通過 ...Executor 執(zhí)行的
獲取到默認(rèn)的同步執(zhí)行器后, 就要初始化 RedissonLock
public?RedissonLock(CommandAsyncExecutor?commandExecutor,?String?name)?{super(commandExecutor,?name);this.commandExecutor?=?commandExecutor;//?唯一IDthis.id?=?commandExecutor.getConnectionManager().getId();//?等待獲取鎖時間this.internalLockLeaseTime?=?commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();//?ID?+?鎖名稱this.entryName?=?id?+?":"?+?name;//?發(fā)布訂閱,?后面關(guān)于加、解鎖流程會用到this.pubSub?=?commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }嘗試獲取鎖
我們來看一下 RLock#lock() ?底層是如何獲取鎖的
@Override public?void?lock()?{try?{lock(-1,?null,?false);}?catch?(InterruptedException?e)?{throw?new?IllegalStateException();} }leaseTime: 加鎖到期時間, -1 使用默認(rèn)值 30 秒
unit: 時間單位, 毫秒、秒、分鐘、小時...
interruptibly: 是否可被中斷標(biāo)示
private?void?lock(long?leaseTime,?TimeUnit?unit,?boolean?interruptibly)?throws?InterruptedException?{//?獲取當(dāng)前線程IDlong?threadId?=?Thread.currentThread().getId();//??????嘗試獲取鎖,?下面重點分析Long?ttl?=?tryAcquire(-1,?leaseTime,?unit,?threadId);//?成功獲取鎖,?過期時間為空if?(ttl?==?null)?{return;}//?訂閱分布式鎖,?解鎖時進(jìn)行通知RFuture<RedissonLockEntry>?future?=?subscribe(threadId);if?(interruptibly)?{commandExecutor.syncSubscriptionInterrupted(future);}?else?{commandExecutor.syncSubscription(future);}try?{while?(true)?{//?再次嘗試獲取鎖ttl?=?tryAcquire(-1,?leaseTime,?unit,?threadId);//?成功獲取鎖,?過期時間為空,?成功返回if?(ttl?==?null)?{break;}//?鎖過期時間如果大于零,?則進(jìn)行帶過期時間的阻塞獲取if?(ttl?>=?0)?{try?{//?獲取不到鎖會在這里進(jìn)行阻塞,?Semaphore,?解鎖時釋放信號量通知future.getNow().getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);}?catch?(InterruptedException?e)?{if?(interruptibly)?{throw?e;}future.getNow().getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);}//?鎖過期時間小于零,?則死等,?區(qū)分可中斷及不可中斷}?else?{if?(interruptibly)?{future.getNow().getLatch().acquire();}?else?{future.getNow().getLatch().acquireUninterruptibly();}}}}?finally?{//?取消訂閱unsubscribe(future,?threadId);} }這一段代碼是用來執(zhí)行加鎖, 繼續(xù)看下方法實現(xiàn)
Long?ttl?=?tryAcquire(-1,?leaseTime,?unit,?threadId);private?Long?tryAcquire(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)?{return?get(tryAcquireAsync(waitTime,?leaseTime,?unit,?threadId)); }lock() 以及 tryLock(...) 方法最終都會調(diào)用此方法, 分為兩個流程分支
1、tryLock(...) API 異步加鎖返回
2、lock() & tryLock() API 異步加鎖并進(jìn)行鎖續(xù)時
private?<T>?RFuture<Long>?tryAcquireAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)?{//?執(zhí)行?tryLock(...)?才會進(jìn)入if?(leaseTime?!=?-1)?{//?進(jìn)行異步獲取鎖return?tryLockInnerAsync(waitTime,?leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);}//?嘗試異步獲取鎖,?獲取鎖成功返回空,?否則返回鎖剩余過期時間RFuture<Long>?ttlRemainingFuture?=?tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);//?ttlRemainingFuture?執(zhí)行完成后觸發(fā)此操作ttlRemainingFuture.onComplete((ttlRemaining,?e)?->?{if?(e?!=?null)?{return;}//?ttlRemaining?==?null?代表獲取了鎖//?獲取到鎖后執(zhí)行續(xù)時操作if?(ttlRemaining?==?null)?{scheduleExpirationRenewal(threadId);}});return?ttlRemainingFuture; }繼續(xù)看一下 tryLockInnerAsync(...) 詳細(xì)的加鎖流程, 內(nèi)部采用的 Lua 腳本形式, 保證了原子性操作
到這一步大家就很明了了, 將 Lua 腳本被 Redisoon 包裝最后通過 Netty 進(jìn)行傳輸
<T>?RFuture<T>?tryLockInnerAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId,?RedisStrictCommand<T>?command)?{internalLockLeaseTime?=?unit.toMillis(leaseTime);return?evalWriteAsync(getName(),?LongCodec.INSTANCE,?command,"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+"return?nil;?"?+"end;?"?+"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+"return?nil;?"?+"end;?"?+"return?redis.call('pttl',?KEYS[1]);",Collections.singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId)); } “evalWriteAsync(...) 是對 Eval 命令的封裝以及 Netty 的應(yīng)用就不繼續(xù)跟進(jìn)了
加鎖 Lua
執(zhí)行 Redis 加鎖的 Lua 腳本, 截個圖讓大家看一下參數(shù)以及具體含義
KEYS[1]: myLock
ARGV[1]: 36000... 這個是過期時間, 自己測試的, 單位毫秒
ARGV[2]: UUID + 線程 ID
#?KEYS[1]?代表上面的?myLock #?判斷?KEYS[1]?是否存在,?存在返回?1,?不存在返回?0 if?(redis.call('exists',?KEYS[1])?==?0)?then#?當(dāng)?KEYS[1]?==?0?時代表當(dāng)前沒有鎖#?使用?hincrby?命令發(fā)現(xiàn)?KEYS[1]?不存在并新建一個?hash#?ARGV[2]?就作為?hash?的第一個key,?val?為?1#?相當(dāng)于執(zhí)行了?hincrby?myLock?91089b45...?1redis.call('hincrby',?KEYS[1],?ARGV[2],?1);#?設(shè)置?KEYS[1]?過期時間,?單位毫秒redis.call('pexpire',?KEYS[1],?ARGV[1]);return?nil; end; #?查找?KEYS[1]?中?key?ARGV[2]?是否存在, 存在回返回?1 if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then#?同上,?ARGV[2]?為?key?的?val?+1redis.call('hincrby',?KEYS[1],?ARGV[2],?1);#?同上redis.call('pexpire',?KEYS[1],?ARGV[1]); return?nil; end; #?返回?KEYS[1]?過期時間,?單位毫秒 return?redis.call('pttl',?KEYS[1]);整個 Lua 腳本加鎖的流程畫圖如下:
現(xiàn)在回過頭看一下獲取到鎖之后, 是如何為鎖進(jìn)行延期操作的
鎖續(xù)時
之前有和軍哥聊過這個話題, 他說的思路和 Redisson 中體現(xiàn)的基本一致
說一下 Redisson 的具體實現(xiàn)思路吧, 中文翻譯叫做 "看門狗"
1、獲取到鎖之后執(zhí)行 "看門狗" 流程
2、使用 Netty 的 Timeout 實現(xiàn)定時延時
3、比如鎖過期 30 秒, 每過 1/3 時間也就是 10 秒會檢查鎖是否存在, 存在則更新鎖的超時時間
可能會有小伙伴會提出這么一個疑問, 如果檢查返回存在, 設(shè)置鎖過期時剛好鎖被釋放了怎么辦?
有這樣的疑問, 代表確實用心去考慮所有可能發(fā)生的情況了, 但是不必?fù)?dān)心哈
Redisson 中使用的 Lua 腳本做的檢查及設(shè)置過期時間操作, 本身是原子性的不會出現(xiàn)上面情況
“如果不想要引用 Netty 的包, 使用延時隊列等包工具也是可以完成 "看門狗"
這里也貼一哈相關(guān)代碼, 能夠讓小伙伴更直觀的了解如何鎖續(xù)時的
我可真是個暖男, 上代碼 RedissonLock#tryAcquireAsync(...)
private?<T>?RFuture<Long>?tryAcquireAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)?{//?...//?嘗試異步獲取鎖,?獲取鎖成功返回空,?否則返回鎖剩余過期時間RFuture<Long>?ttlRemainingFuture?=?tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);//?ttlRemainingFuture?執(zhí)行完成后觸發(fā)此操作ttlRemainingFuture.onComplete((ttlRemaining,?e)?->?{if?(e?!=?null)?{return;}//?獲取到鎖后執(zhí)行續(xù)時操作if?(ttlRemaining?==?null)?{scheduleExpirationRenewal(threadId);}});return?ttlRemainingFuture; }可以看到續(xù)時方法將 threadId 當(dāng)作標(biāo)識符進(jìn)行續(xù)時
private?void?scheduleExpirationRenewal(long?threadId)?{ExpirationEntry?entry?=?new?ExpirationEntry();ExpirationEntry?oldEntry?=?EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),?entry);if?(oldEntry?!=?null)?{oldEntry.addThreadId(threadId);}?else?{entry.addThreadId(threadId);renewExpiration();} }知道核心理念就好了, 沒必要研究每一行代碼哈
private?void?renewExpiration()?{ExpirationEntry?ee?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());if?(ee?==?null)?{return;}Timeout?task?=?commandExecutor.getConnectionManager().newTimeout(new?TimerTask()?{@Overridepublic?void?run(Timeout?timeout)?throws?Exception?{ExpirationEntry?ent?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());if?(ent?==?null)?{return;}Long?threadId?=?ent.getFirstThreadId();if?(threadId?==?null)?{return;}RFuture<Boolean>?future?=?renewExpirationAsync(threadId);future.onComplete((res,?e)?->?{if?(e?!=?null)?{log.error("Can't?update?lock?"?+?getName()?+?"?expiration",?e);return;}if?(res)?{//?調(diào)用本身renewExpiration();}});}},?internalLockLeaseTime?/?3,?TimeUnit.MILLISECONDS);ee.setTimeout(task); }解鎖操作
解鎖時的操作相對加鎖還是比較簡單的
@Override public?void?unlock()?{try?{get(unlockAsync(Thread.currentThread().getId()));}?catch?(RedisException?e)?{if?(e.getCause()?instanceof?IllegalMonitorStateException)?{throw?(IllegalMonitorStateException)?e.getCause();}?else?{throw?e;}} }解鎖成功后會將之前的"看門狗" Timeout 續(xù)時取消, 并返回成功
@Override public?RFuture<Void>?unlockAsync(long?threadId)?{RPromise<Void>?result?=?new?RedissonPromise<Void>();RFuture<Boolean>?future?=?unlockInnerAsync(threadId);future.onComplete((opStatus,?e)?->?{//?取消自動續(xù)時功能cancelExpirationRenewal(threadId);if?(e?!=?null)?{//?失敗result.tryFailure(e);return;}if?(opStatus?==?null)?{IllegalMonitorStateException?cause?=?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"+?id?+?"?thread-id:?"?+?threadId);result.tryFailure(cause);return;}//?解鎖成功result.trySuccess(null);});return?result; }又是一個精髓點, 解鎖的 Lua 腳本定義
protected?RFuture<Boolean>?unlockInnerAsync(long?threadId)?{return?evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,"if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?"?+"return?nil;"?+"end;?"?+"local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);?"?+"if?(counter?>?0)?then?"?+"redis.call('pexpire',?KEYS[1],?ARGV[2]);?"?+"return?0;?"?+"else?"?+"redis.call('del',?KEYS[1]);?"?+"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+"return?1;?"?+"end;?"?+"return?nil;",Arrays.asList(getName(),?getChannelName()),?LockPubSub.UNLOCK_MESSAGE,?internalLockLeaseTime,?getLockName(threadId)); }還是來張圖理解哈, Lua 腳本會詳細(xì)分析
解鎖 Lua
老規(guī)矩, 圖片加參數(shù)說明
KEYS[1]: myLock
KEYS[2]: redisson_lock_channel:{myLock}
ARGV[1]: 0
ARGV[2]: 360000... (過期時間)
ARGV[3]: 7f0c54e2...(Hash 中的鎖 Key)
#?判斷?KEYS[1]?中是否存在?ARGV[3] if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then return?nil; end; #?將?KEYS[1]?中?ARGV[3]?Val?-?1 local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1); #?如果返回大于0?證明是一把重入鎖 if?(counter?>?0)?then#?重制過期時間redis.call('pexpire',?KEYS[1],?ARGV[2]); return?0; else#?刪除?KEYS[1]redis.call('del',?KEYS[1]);#?通知阻塞等待線程或進(jìn)程資源可用redis.call('publish',?KEYS[2],?ARGV[1]); return?1; end; return?nil;Redlock 算法
不可否認(rèn), Redisson 設(shè)計的分布式鎖真的很 NB, 但是還是沒有解決 主從節(jié)點下異步同步數(shù)據(jù)導(dǎo)致鎖丟失問題
所以 Redis 作者 Antirez 推出 紅鎖算法, 這個算法的精髓就是: 沒有從節(jié)點, 如果部署多臺 Redis, 各實例之間相互獨立, 不存在主從復(fù)制或者其他集群協(xié)調(diào)機(jī)制
如何使用
創(chuàng)建多個 Redisson Node, 由這些無關(guān)聯(lián)的 Node 組成一個完整的分布式鎖
public?static?void?main(String[]?args)?{String?lockKey?=?"myLock";Config?config?=?new?Config();config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6379");Config?config2?=?new?Config();config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6380");Config?config3?=?new?Config();config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6381");RLock?lock?=?Redisson.create(config).getLock(lockKey);RLock?lock2?=?Redisson.create(config2).getLock(lockKey);RLock?lock3?=?Redisson.create(config3).getLock(lockKey);RedissonRedLock?redLock?=?new?RedissonRedLock(lock,?lock2,?lock3);try?{redLock.lock();}?finally?{redLock.unlock();} }當(dāng)然, 對于 Redlock 算法不是沒有質(zhì)疑聲, 大家可以去 Redis 官網(wǎng)查看Martin Kleppmann 與 Redis 作者Antirez 的辯論
CAP 原則之間的取舍
CAP 原則又稱 CAP 定理, 指的是在一個分布式系統(tǒng)中, ?Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區(qū)容錯性), 三者不可得兼
一致性(C) : 在分布式系統(tǒng)中的所有數(shù)據(jù)備份, 在同一時刻是否同樣的值(等同于所有節(jié)點訪問同一份最新的數(shù)據(jù)副本)
可用性(A): 在集群中一部分節(jié)點故障后, 集群整體是否還能響應(yīng)客戶端的讀寫請求(對數(shù)據(jù)更新具備高可用性)
分區(qū)容忍性(P): 以實際效果而言, 分區(qū)相當(dāng)于對通信的時限要求. 系統(tǒng)如果不能在時限內(nèi)達(dá)成數(shù)據(jù)一致性, 就意味著發(fā)生了分區(qū)的情況, 必須就當(dāng)前操作在 C 和 A 之間做出選擇
分布式鎖選型
如果要滿足上述分布式鎖之間的強(qiáng)一致性, 可以采用 Zookeeper 的分布式鎖, 因為它底層的 ZAB協(xié)議(原子廣播協(xié)議), 天然滿足 CP
但是這也意味著性能的下降, 所以不站在具體數(shù)據(jù)下看 Redis 和 Zookeeper, 代表著性能和一致性的取舍
如果項目沒有強(qiáng)依賴 ZK, 使用 Redis 就好了, 因為現(xiàn)在 Redis 用途很廣, 大部分項目中都引用了 Redis
沒必要對此再引入一個新的組件, 如果業(yè)務(wù)場景對于 Redis 異步方式的同步數(shù)據(jù)造成鎖丟失無法忍受, 在業(yè)務(wù)層處理就好了
往期推薦Redis為什么變慢了?一文詳解Redis性能問題 | 萬字長文
Redis 消息隊列的三種方案(List、Streams、Pub/Sub)
硬核Redis總結(jié),看這篇就夠了!
總結(jié)
以上是生活随笔為你收集整理的一文掌握Redisson分布式锁原理|干货推荐的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java LinkedList void
- 下一篇: 线程安全问题的 3 种解决方案!