Redis进阶- Redisson分布式锁实现原理及源码解析
文章目錄
- Pre
- 用法
- Redisson分布式鎖實現原理
- Redisson分布式鎖源碼分析
- redisson.getLock(lockKey) 的邏輯
- redissonLock.lock()的邏輯
- redissonLock.unlock();邏輯
- 總結
Pre
Redis進階-細說分布式鎖中我們梳理了使用Redis實現分布式鎖的演進過程,并提出了目前最完善的解決方案:Redisson 實現分布式鎖 。
這里我們來分析下Redisson分布式鎖實現原理及源碼解析
用法
使用redisson實現分布式鎖的操作步驟,三部曲
- 第一步: 獲取鎖 RLock redissonLock = redisson.getLock(lockKey);
- 第二步: 加鎖,實現鎖續命功能 redissonLock.lock();
- 第三步:釋放鎖 redissonLock.unlock();
Redisson分布式鎖實現原理
熟悉了基本用法以后,我們來看下Redission實現分布式鎖的原理,再理解了原理之后,后續梳理源碼實現就更加得心應手了。
Redisson分布式鎖源碼分析
流程圖如下
重點主要是依賴lua腳本的原子性,實現加鎖和釋放鎖的功能
redisson.getLock(lockKey) 的邏輯
@Overridepublic RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);}實例化RedissonLock,我們看下RedissonLock的構造函數
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();}-
super(commandExecutor, name); 父類name賦值,后續通過getName()獲取
-
commandExecutor: 執行lua腳本的executor
-
id 是個UUID, 后面被用來當做 和threadId組成 value值,用作判斷加鎖和釋放鎖是否是同一個線程的校驗。
-
internalLockLeaseTime : 取自 Config#lockWatchdogTimeout,默認30秒,這個參數還有另外一個作用,鎖續命的執行周期 internalLockLeaseTime/3 = 10秒
redissonLock.lock()的邏輯
主要是實現加鎖和鎖的續命
redissonLock.lock();看看都干了啥
@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}繼續看 lockInterruptibly
@Overridepublic void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}繼續看 lockInterruptibly(-1, null);
@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 獲取當前線程IDlong threadId = Thread.currentThread().getId();// 嘗試獲取鎖的剩余時間 Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired ttl為空,說明沒有線程持有該鎖,直接返回 讓當前線程加鎖成功 if (ttl == null) {return;}RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);// 死循環 try {while (true) {// 再此嘗試獲取鎖的剩余時間 ,如果為null, 跳出循環ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for message 如果ttl >=0 說明 有其他線程持有該鎖if (ttl >= 0) {// 獲取信號量,嘗試加鎖,設置最大等待市場為ttlgetEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// 如果ttl小于0 (-1 ,-2 ) 說明已經過期,直接獲取getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);} // get(lockAsync(leaseTime, unit));}大流程已經梳理完了,我們看下 Long ttl = tryAcquire(leaseTime, unit, threadId);
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}繼續看下
tryAcquireAsync(leaseTime, unit, threadId) private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 剛開始 leaseTime 傳入的是 -1 ,所以走這個分支// 1)嘗試加鎖 待會細看 先把主要的邏輯梳理完RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 2) 注冊監聽事件ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {// 3)獲取鎖成功的話,給鎖延長過期時間 scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}繼續看
// 1)嘗試加鎖 待會細看 先把主要的邏輯梳理完RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);看實現
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}lua 腳本
KEYS[1] ---------> getName()
ARGV[1] ---------> internalLockLeaseTime
ARGV[2] ---------> getLockName(threadId) 實現如下
這個id就是自開始實例化RedissonLock的id ,是個UUID
我們來解釋下這段lua腳本
// 如果 lockKey不存在 ,設置 使用hset設置 lockKey ,field為 uuid:threadId ,value為1 ,并設置過期時間//就是這個命令 //127.0.0.1:6379> hset lockkey uuid:threadId 1//(integer) 1//127.0.0.1:6379> PEXPIRE lockkey internalLockLeaseTime"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果 lockKey 存在和 filed 和 當前線程的uuid:threadId相同 key 加1 ,執行多少次 就加多次 設置過期時間 其實就是如下命令//127.0.0.1:6379> HEXISTS lockkey uuid:threadId//(integer) 1//127.0.0.1:6379> PEXPIRE lockkey internalLockLeaseTime"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 最后返回 lockkey的 pttl "return redis.call('pttl', KEYS[1]);"那繼續監聽時間中的 scheduleExpirationRenewal(threadId); 邏輯
private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {// 重點是run方法 @Overridepublic void run(Timeout timeout) throws Exception {// 又是lua腳本 判斷是否存在,存在就調用pexpire RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));// 監聽事件中又 調用了自己 scheduleExpirationRenewalfuture.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}redissonLock.unlock();邏輯
@Overridepublic void unlock() {Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}if (opStatus) {cancelExpirationRenewal();}}重點看 unlockInnerAsync(Thread.currentThread().getId())
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}又是lua腳本,核心就是 把value減到為0 ,刪除key
KEYS[1] ---------> getName()
KEYS[2] ---------> getChannelName()
ARGV[1] ---------> LockPubSub.unlockMessage
ARGV[2] ---------> internalLockLeaseTime
ARGV[2] ---------> getLockName(threadId)
總結
需要用到續鎖功能時,一要記住不要設置鎖的過期時間,可以設置成-1.
一旦設了時間,RedissonLock就會認為你需要自己控制鎖時間,而放棄執行續鎖邏輯。
查看源碼, 續鎖邏輯需要起定時器。所以要注意這點,并不是所有分布式場景都需要續鎖邏輯的。當我們很難判斷業務邏輯的執行時間時,不妨開啟續鎖。
至此,原理和源碼我們粗略的梳理完了 ,梳理了主要的核心流程,主要是依靠lua腳本,代碼寫的還是非常優秀的,向開源學習!!!
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Redis进阶- Redisson分布式锁实现原理及源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redis进阶-lua脚本
- 下一篇: Redis进阶-5.x 单节点 及Re