Redisson分布式锁分析
2019獨角獸企業重金招聘Python工程師標準>>>
RedissonLock繼承結構:
RLock接口中定義的方法:主要分析tryLock()實現。
**(一) RedissonLock#tryLock:**加鎖邏輯
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);// 等待時間long current = System.currentTimeMillis();final long threadId = Thread.currentThread().getId();// 使用當前線程ID,實現重入鎖Long ttl = tryAcquire(leaseTime, unit, threadId);// 嘗試獲取鎖// lock acquiredif (ttl == null) {// 沒有過期時間,未上鎖,直接返回獲取成功return true;}time -= (System.currentTimeMillis() - current);if (time <= 0) {// 等待時間已超時acquireFailed(threadId);return false;}current = System.currentTimeMillis();// 訂閱鎖的隊列,等待鎖被其余線程釋放后通知// 通過Redis的Channel訂閱監聽隊列,subscribe內部通過信號量semaphore,再通過await方法阻塞,內部其實是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果,來保證訂閱成功final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {// 阻塞等待subscribe的future的結果對象(即分布式解鎖消息的結果)if (!subscribeFuture.cancel(false)) {subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {@Overridepublic void operationComplete(Future<RedissonLockEntry> future) throws Exception {if (subscribeFuture.isSuccess()) {unsubscribe(subscribeFuture, threadId);}}});}acquireFailed(threadId);return false;}try {time -= (System.currentTimeMillis() - current);if (time <= 0) {// subscribe方法調用超時,取消訂閱,不再繼續申請鎖acquireFailed(threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(leaseTime, unit, threadId);// 再次嘗試申請鎖if (ttl == null) {// 獲得鎖,返回return true;}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {// 超時acquireFailed(threadId);return false;}// waiting for message 等待訂閱的隊列消息currentTime = System.currentTimeMillis();// 通過信號量(共享鎖)阻塞,等待解鎖消息(這一點設計的非常精妙:減少了其他分布式節點的等待或者空轉等無效鎖申請的操作,整體提高了性能)if (ttl >= 0 && ttl < time) {// 在ttl內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// 在等待剩余時間內...getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {// 超時acquireFailed(threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);// 無論是否獲得鎖,都要取消訂閱解鎖消息} // return get(tryLockAsync(waitTime, leaseTime, unit)); } ## RedissonLock#getEntry -> RedissonLock#getEntryName protected String getEntryName() {return id + ":" + getName();// 客戶端實例ID+鎖名稱來保證多個實例下的鎖可重入 }0x0001:**RedissonLock#tryAcquire:**嘗試獲取鎖
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId)); } ## RedissonObject#get protected final CommandAsyncExecutor commandExecutor; protected final <V> V get(RFuture<V> future) {return commandExecutor.get(future); }0x0010:**RedissonLock#tryAcquireAsync:**異步嘗試獲取鎖,異步調用,get方法阻塞
## RedissonLock#tryAcquireAsync private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {// 起租時間不為空return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// lockWatchdogTimeout = 30 * 1000默認30秒RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();if (ttlRemaining == null) {// 拿到鎖scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture; } ## RedissonLock#scheduleExpirationRenewal Redisson避免死鎖方案,避免鎖未被釋放。 ## 若未設置過期時間的話,redission默認的過期時間是30s,同時未避免鎖在業務未處理完成之前被提前釋放,Redisson在獲取到鎖且默認過期時間的時候,會在當前客戶端內部啟動一個定時任務,每隔internalLockLeaseTime/3的時間去刷新key的過期時間,這樣既避免了鎖提前釋放,同時如果客戶端宕機的話,這個鎖最多存活30s的時間就會自動釋放 if (expirationRenewalMap.containsKey(getEntryName())) {return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});} }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel(); }0x0011:**CommandAsyncService#get:**包裝了CountDownLatch是為了支持線程可中斷操作
public <V> V get(RFuture<V> future) {if (!future.isDone()) {// Task還沒執行完成final CountDownLatch l = new CountDownLatch(1);// 設置一個單線程的同步控制器future.addListener(new FutureListener<V>() {@Overridepublic void operationComplete(Future<V> future) throws Exception {l.countDown();// 操作完成}});boolean interrupted = false;while (!future.isDone()) {try {l.await();} catch (InterruptedException e) {interrupted = true;break;}}if (interrupted) {Thread.currentThread().interrupt();}}// commented out due to blocking issues up to 200 ms per minute for each thread// future.awaitUninterruptibly();if (future.isSuccess()) {return future.getNow();}throw convertException(future); }0x0100:**RedissonLock#tryLockInnerAsync:**真正執行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執行。使用Lua的好處:Redis會將整個腳本作為一個整體執行,中間不會被其他命令插入。因此在編寫腳本的過程中無需擔心會出現競態條件,無需使用事務。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);// 鎖過期時間-毫秒// getName() - 邏輯鎖名稱:比如方法名+名稱return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"Lua腳本",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } ## RedissonLock#getLockName 鎖對應的線程級別的名稱,支持相同線程可重入,不同線程不可重入 protected String getLockName(long threadId) {return id + ":" + threadId; }????鎖的實際存儲類型是hash。KEY[1]->邏輯鎖名稱 ARGV[2]->線程級別的鎖名稱
if (redis.call('exists', KEYS[1]) == 0) thenredis.call('hset', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil; end; return redis.call('pttl', KEYS[1]);????分析這段Lua腳本:
- 檢查鎖名稱是否存在,如果不存在,獲取成功,同時設置線程級別鎖名稱,設置過期時間為internalLockLeaseTime
- 如果檢查存在KEYS[1], ARGV[2],獲取成功,自增1,記錄重入的次數,更新過期時間
- 如果key不存在,直接返回key的剩余過期時間
0x0101:**RedissonLock#acquireFailed:**把該線程從獲取鎖操作的等待隊列中直接刪掉
private void acquireFailed(long threadId) {get(acquireFailedAsync(threadId)); } ## RedissonFairLock#acquireFailedAsync @Override protected RFuture<Void> acquireFailedAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,"local firstThreadId = redis.call('lindex', KEYS[1], 0); " + "if firstThreadId == ARGV[1] then " + "local keys = redis.call('zrange', KEYS[2], 0, -1); " + "for i = 1, #keys, 1 do " + "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" + "end;" + "end;" +"redis.call('zrem', KEYS[2], ARGV[1]); " +"redis.call('lrem', KEYS[1], 0, ARGV[1]); ",Arrays.<Object>asList(threadsQueueName, timeoutSetName), getLockName(threadId), threadWaitTime); }0x0110:**PublishSubscribe#subscribe:**訂閱消息
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); // 根據channelName拿到信號量,channelName=UUID+":"+name,對應一個鎖 final AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName)); final RPromise<E> newPromise = new RedissonPromise<E>() {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return semaphore.remove(listenerHolder.get());} }; Runnable listener = new Runnable() {@Overridepublic void run() {E entry = entries.get(entryName);if (entry != null) {entry.aquire();semaphore.release();entry.getPromise().addListener(new TransferListener<E>(newPromise));return;}E value = createEntry(newPromise);value.aquire();E oldValue = entries.putIfAbsent(entryName, value);if (oldValue != null) {oldValue.aquire();semaphore.release();oldValue.getPromise().addListener(new TransferListener<E>(newPromise));return;}RedisPubSubListener<Object> listener = createListener(channelName, value);subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);} }; // 把生成的監聽線程listenser加入到信號量的監聽集合中去,后面發布解鎖消息的時候,會喚醒 semaphore.acquire(listener); listenerHolder.set(listener); return newPromise;**(二) RedissonLock#unlock:**解鎖邏輯
@Override public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {...} }0x0001:**RedissonLock#unlockAsync:**異步解鎖
@Override public RFuture<Void> unlockAsync(final long threadId) {final RPromise<Void> result = new RedissonPromise<Void>();RFuture<Boolean> future = unlockInnerAsync(threadId);// 此處Redis實現future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {cancelExpirationRenewal(threadId);result.tryFailure(future.cause());return;}Boolean opStatus = future.getNow();if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}if (opStatus) {// 解鎖成功之后取消更新鎖expire的時間任務,針對于沒有鎖過期時間的cancelExpirationRenewal(null);}result.trySuccess(null);}});return result; }0x0010:**RedissonLock#unlockInnerAsync:**當其他線程釋放鎖的時候,會同時根據鎖的唯一通道publish一條分布式的解鎖信息,接收到分布式消息后, 等待獲取鎖的Semaphore中的監聽隊列中的listenser線程可重新申請鎖。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"Lua腳本",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }????拿出來Lua看看如何實現的:KEY[1]->邏輯鎖名稱 KEY[2]->getChannelName() ARGV[1]->0L ARGV[2]->過期時間 ARGV[3]->線程級別的鎖名稱
if (redis.call('exists', KEYS[1]) == 0) thenredis.call('publish', KEYS[2], ARGV[1]);return 1; end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) thenreturn nil; end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) thenredis.call('pexpire', KEYS[1], ARGV[2]);return 0; elseredis.call('del', KEYS[1]);redis.call('publish', KEYS[2], ARGV[1]);return 1; end; return nil;????分析這段Lua腳本:
- 如果鍵不存在,說明鎖可以用了,發布鎖釋放消息后返回
- 如果鎖不是被當前線程鎖定,則返回nil
- Redisson支持重入,在解鎖的時候,引用數減1
- 如果重入數>0,重新設置過期時間
- 如果重入數>=0,說明鎖可以使用了,發布鎖釋放消息后返回
**(三) LockPubSub#unlockMessage:**處理解鎖消息
Redisson在LockPubSub中處理解鎖消息,首先看一下LockPubSub繼承結構:
PublishSubscribe#createListener
// 模板方法,提供給子類實現 protected abstract void onMessage(E value, Long message);private RedisPubSubListener<Object> createListener(final String channelName, final E value) {RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {@Overridepublic void onMessage(CharSequence channel, Object message) {if (!channelName.equals(channel.toString())) {return;}PublishSubscribe.this.onMessage(value, (Long)message);// 此處會調用到LockPubSub.onMessage}...};return listener; }**LockPubSub:**解鎖消息
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {public static final Long unlockMessage = 0L;// 解鎖消息,redis執行lua返回值public static final Long readUnlockMessage = 1L;@Overrideprotected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(unlockMessage)) {// 解鎖消息Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// 釋放一個,喚醒等待的entry.getLatch().tryAcquire去再次嘗試獲取鎖value.getLatch().release();} else if (message.equals(readUnlockMessage)) {while (true) {// 如果還有其他Listeners回調,也喚醒執行Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}} }轉載于:https://my.oschina.net/javamaster/blog/3006215
總結
以上是生活随笔為你收集整理的Redisson分布式锁分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NLPIR智能语义技术从采集到分析一步到
- 下一篇: Vue(五)Vue规范