redisson 看门狗_Redisson的分布式锁
生活随笔
收集整理的這篇文章主要介紹了
redisson 看门狗_Redisson的分布式锁
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
最近想使用redisson的分布式鎖去替換系統中的redis分布式鎖從而解決續期問題,查看了源碼,發現其原理還是比較容易理解的。
一、Maven配置
<dependency> <groupId>org.redissongroupId> <artifactId>redissonartifactId> <version>3.13.4version> dependency>二、Springboot定義配置類
@Configurationpublic class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); // config.useClusterServers().addNodeAddress("redis://" + host + ":" + port); // 分片集群方式 SingleServerConfig server = config.useSingleServer(); config.setLockWatchdogTimeout(5 * 1000L); server.setAddress("redis://" + host + ":" + port); server.setPassword(password); RedissonClient redissonClient = Redisson.create(config); return redissonClient; }}三、API
RedissionClient交互于Redis和Java。其常用的實現類為Redisson,上鎖/解鎖操作的API也很簡單:
RLock lock = redissonClient.getLock("鎖的key");lock.lock();lock.unLock();四、源碼解讀
redissionClient.getLock(“鎖的名稱”);本質上是創建了一個RLock。
RLock lock =new RedissonLock(connectionManager.getCommandExecutor(), name);1、加鎖RLock.lock
底層進入lock(鎖的有效時間,時間單位,是否中斷)方法
/** * * @param leaseTime 鎖的有效期 * @param unit 時間單位 * @param interruptibly 中斷標識位 * @throws InterruptedException */private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 嘗試獲取鎖的邏輯,返回值為表明redis中此鎖還剩余的有效時長 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 如果鎖的有效時間為空,證明上鎖成功 if (ttl == null) { return; } RFuture future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } /** * 自旋的方式重試獲取鎖 */ try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // 如果鎖的有效時間為空,證明上鎖成功 if (ttl == null) { break; } // future.getNow().getLatch() 底層返回一個信號量Semaphore // 在ttl的時間內去嘗試獲取許可 // 獲取不到則阻塞等待信號量的釋放或者ttl之后再去執行下面代碼===> sleep(ttl) if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); }}那我們看看核心方法tryAcquire是干什么的
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}/** * * @param waitTime 等待時長 * @param leaseTime 鎖的時長 * @param unit 時間單位 * @param threadId 線程ID * @param * @return */private RFuturetryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { // 如果參數中設置了鎖的時長則直接通過lua腳本去嘗試創建redis中的節點,并設置時長 return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 未設置時長的情況下,使用看門狗配置的時長;lua腳本設置redis節點 RFuture ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // 返回值為空,表明設置成功,則使用看門狗機制為鎖續期 if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture;}分布式鎖獲取的lua腳本
RFuturetryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}????續期
/** * 定時續期操作 * @param threadId */private void scheduleExpirationRenewal(long threadId) { // 新建包裝續期操作的任務實體 ExpirationEntry entry = new ExpirationEntry(); // 放入實體map ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 設置線程ID oldEntry.addThreadId(threadId); } else { // 設置線程ID entry.addThreadId(threadId); // 續期 renewExpiration(); }}/** * 續期 */private void renewExpiration() { // 從任務實體map中獲取本次任務實體 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 封裝本次任務, 定時為看門狗配置時間/3 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 判斷鎖是否還在 RFuture future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // 重新激活任務 renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 任務實體設置任務 ee.setTimeout(task);}判斷鎖是否還在的lua腳本
protected RFuturerenewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}2、解鎖RLock.unLock
/** * 同步解鎖 * @param threadId 線程ID * @return */public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); // lua腳本刪除redis中的節點 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消任務的續期 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result;}lua腳本刪除redis中的節點
protected RFutureunlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }取消任務續期
void cancelExpirationRenewal(Long threadId) { // 獲取任務實體 ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { // 任務取消線程的綁定 task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { // 取消任務 timeout.cancel(); } // 移除任務實體 EXPIRATION_RENEWAL_MAP.remove(getEntryName()); }}五、流程圖
加鎖原理
解鎖原理
六、思考
Redisson在獲取不到鎖的情況下,默認是一直阻塞自旋的,如果業務中不想一直等待,該如何處理呢?
其實很簡單,我們只要通過反射調用它嘗試獲取鎖的方法,從而規避自旋部分即可。
/** * 返回獲取鎖的狀態,true表示上鎖成功 * * @param lockKey * @return */public boolean lockBackState(String lockKey) { try { RLock lock = redissonClient.getLock(lockKey); RedissonLock l = (RedissonLock) lock; Method method = l.getClass().getDeclaredMethod("tryAcquire", long.class, long.class, TimeUnit.class, long.class); method.setAccessible(true); Object invoke = method.invoke(l, -1L, -1L, null, Thread.currentThread().getId()); return invoke == null; } catch (Exception e) { e.printStackTrace(); return false; }}歡迎大家和帝都的雁積極互動,頭腦交流會比個人埋頭苦學更有效!共勉!
CSDN:https://blog.csdn.net/yxh13521338301
總結
以上是生活随笔為你收集整理的redisson 看门狗_Redisson的分布式锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rabbitmq如何保证消息不丢失_Ra
- 下一篇: python下载包管理器_Python包