聊聊分布式锁——Redis和Redisson的方式
聊聊分布式鎖——Redis和Redisson的方式
一、什么是分布式鎖
分布式~~鎖,要這么念,首先得是『分布式』,然后才是『鎖』
分布式:這里的分布式指的是分布式系統,涉及到好多技術和理論,包括CAP 理論、分布式存儲、分布式事務、分布式鎖…
分布式系統是由一組通過網絡進行通信、為了完成共同的任務而協調工作的計算機節點組成的系統。
分布式系統的出現是為了用廉價的、普通的機器完成單個計算機無法完成的計算、存儲任務。其目的是利用更多的機器,處理更多的數據。
鎖:對對,就是你想的那個,Javer 學的第一個鎖應該就是 synchronized
Java 初級面試問題,來拼寫下 賽克瑞納挨日的
從鎖的使用場景有來看下邊這 3 種鎖:
線程鎖:synchronized 是用在方法或代碼塊中的,我們把它叫『線程鎖』,線程鎖的實現其實是靠線程之間共享內存實現的,說白了就是內存中的一個整型數,有空閑、上鎖這類狀態,比如 synchronized 是在對象頭中的 Mark Word 有個鎖狀態標志,Lock 的實現類大部分都有個叫 volatile int state 的共享變量來做狀態標志。
進程鎖:為了控制同一操作系統中多個進程訪問某個共享資源,因為進程具有獨立性,各個進程無法訪問其他進程的資源,因此無法通過 synchronized 等線程鎖實現進程鎖。比如說,我們的同一個 linux 服務器,部署了好幾個 Java 項目,有可能同時訪問或操作服務器上的相同數據,這就需要進程鎖,一般可以用『文件鎖』來達到進程互斥。
分布式鎖:隨著用戶越來越多,我們上了好多服務器,原本有個定時給客戶發郵件的任務,如果不加以控制的話,到點后每臺機器跑一次任務,客戶就會收到 N 條郵件,這就需要通過分布式鎖來互斥了。
書面解釋:分布式鎖是控制分布式系統或不同系統之間共同訪問共享資源的一種鎖實現,如果不同的系統或同一個系統的不同主機之間共享了某個資源時,往往需要互斥來防止彼此干擾來保證一致性。
知道了什么是分布式鎖,接下來就到了技術選型環節
二、分布式鎖要怎么搞
要實現一個分布式鎖,我們一般選擇集群機器都可以操作的外部系統,然后各個機器都去這個外部系統申請鎖。
這個外部系統一般需要滿足如下要求才能勝任:
互斥:在任意時刻,只能有一個客戶端能持有鎖。
防止死鎖:即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。所以鎖一般要有一個過期時間。
獨占性:解鈴還須系鈴人,加鎖和解鎖必須是同一個客戶端,一把鎖只能有一把鑰匙,客戶端自己的鎖不能被別人給解開,當然也不能去開別人的鎖。
容錯:外部系統不能太“脆弱”,要保證外部系統的正常運行,客戶端才可以加鎖和解鎖。
我覺得可以這么類比:
好多商販要租用某個倉庫,同一時刻,只能給一個商販租用,且只能有一把鑰匙,還得有固定的“租期”,到期后要回收的,當然最重要的是倉庫門不能壞了,要不鎖都鎖不住。這不就是分布式鎖嗎?
感慨自己真是個愛技術愛生活的程序猿~~
其實鎖,本質上就是用來進行防重操作的(數據一致性),像查詢這種冪等操作,就不需要費這勁
直接上結論:
分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基于 Redis 的分布式鎖;3. 基于 ZooKeeper 的分布式鎖。
但為了追求更好的性能,我們通常會選擇使用 Redis 或 Zookeeper 來做。
想必也有喜歡問為什么的同學,那數據庫客觀鎖怎么就性能不好了?
使用數據庫樂觀鎖,包括主鍵防重,版本號控制。但是這兩種方法各有利弊。
使用主鍵沖突的策略進行防重,在并發量非常高的情況下對數據庫性能會有影響,尤其是應用數據表和主鍵沖突表在一個庫的時候,表現更加明顯。還有就是在 MySQL 數據庫中采用主鍵沖突防重,在大并發情況下有可能會造成鎖表現象,比較好的辦法是在程序中生產主鍵進行防重。
使用版本號策略
這個策略源于 MySQL 的 MVCC 機制,使用這個策略其實本身沒有什么問題,唯一的問題就是對數據表侵入較大,我們要為每個表設計一個版本號字段,然后寫一條判斷 SQL 每次進行判斷。
第三趴,編碼
三、基于 Redis 的分布式鎖
其實 Redis 官網已經給出了實現:https://redis.io/topics/distlock,說各種書籍和博客用了各種手段去用 Redis 實現分布式鎖,建議用 Redlock 實現,這樣更規范、更安全。我們循序漸進來看
我們默認指定大家用的是 Redis 2.6.12 及更高的版本,就不再去講 setnx、expire 這種了,直接 set 命令加鎖
set key value[expiration EX seconds|PX milliseconds] [NX|XX]eg:
SET resource_name my_random_value NX PX 30000SET 命令的行為可以通過一系列參數來修改
EX second :設置鍵的過期時間為 second 秒。SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :設置鍵的過期時間為 millisecond 毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在鍵不存在時,才對鍵進行設置操作。SET key value NX 效果等同于 SETNX key value 。
XX :只在鍵已經存在時,才對鍵進行設置操作。
這條指令的意思:當 key——resource_name 不存在時創建這樣的key,設值為 my_random_value,并設置過期時間 30000 毫秒。
別看這干了兩件事,因為 Redis 是單線程的,這一條指令不會被打斷,所以是原子性的操作。
Redis 實現分布式鎖的主要步驟:
指定一個 key 作為鎖標記,存入 Redis 中,指定一個 唯一的標識 作為 value。
當 key 不存在時才能設置值,確保同一時間只有一個客戶端進程獲得鎖,滿足 互斥性 特性。
設置一個過期時間,防止因系統異常導致沒能刪除這個 key,滿足 防死鎖 特性。
當處理完業務之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足 解鈴還須系鈴人 。
設置一個隨機值的意思是在解鎖時候判斷 key 的值和我們存儲的隨機數是不是一樣,一樣的話,才是自己的鎖,直接 del 解鎖就行。
當然這個兩個操作要保證原子性,所以 Redis 給出了一段 lua 腳本(Redis 服務器會單線程原子性執行 lua 腳本,保證 lua 腳本在處理的過程中不會被任意其它請求打斷。):
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end問題:
我們先拋出兩個問題思考:
獲取鎖時,過期時間要設置多少合適呢?
預估一個合適的時間,其實沒那么容易,比如操作資源的時間最慢可能要 10 s,而我們只設置了 5 s 就過期,那就存在鎖提前過期的風險。這個問題先記下,我們先看下 Javaer 要怎么在代碼中用 Redis 鎖。
容錯性如何保證呢?
Redis 掛了怎么辦,你可能會說上主從、上集群,但也會出現這樣的極端情況,當我們上鎖后,主節點就掛了,這個時候還沒來的急同步到從節點,主從切換后鎖還是丟了
帶著這兩個問題,我們接著看
Redisson 實現代碼
redisson 是 Redis 官方的分布式鎖組件。GitHub 地址:https://github.com/redisson/redisson
Redisson 是一個在 Redis 的基礎上實現的 Java 駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的 Java 常用對象,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務。Redisson 提供了使用 Redis 的最簡單和最便捷的方法。Redisson 的宗旨是促進使用者對 Redis 的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
redisson 現在已經很強大了,github 的 wiki 也很詳細,分布式鎖的介紹直接戳 Distributed locks and synchronizers
Redisson 支持單點模式、主從模式、哨兵模式、集群模式,只是配置的不同,我們以單點模式來看下怎么使用,代碼很簡單,都已經為我們封裝好了,直接拿來用就好,詳細的demo,我放在了 github: starfish-learn-redisson 上,這里就不一步步來了
RLock lock = redisson.getLock("myLock");RLock 提供了各種鎖方法,我們來解讀下這個接口方法,
注:代碼為 3.16.2 版本,可以看到繼承自 JDK 的 Lock 接口,和 Reddsion 的異步鎖接口 RLockAsync(這個我們先不研究)
RLock
Demo
就是這么簡單,Redisson 已經做好了封裝,使用起來 so easy,如果使用主從、哨兵、集群這種也只是配置不同。
原理
看源碼小 tips,最好是 fork 到自己的倉庫,然后拉到本地,邊看邊注釋,然后提交到自己的倉庫,也方便之后再看,不想這么麻煩的,也可以直接看我的 Jstarfish/redisson
先看下 RLock 的類關系
跟著源碼,可以發現 RedissonLock 是 RLock 的直接實現,也是我們加鎖、解鎖操作的核心類
加鎖
主要的加鎖方法就下邊這兩個,區別也很簡單,一個有等待時間,一個沒有,所以我們挑個復雜的看(源碼包含了另一個的絕大部分)
RedissonLock.tryLock
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 獲取等鎖的最長時間 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); //取得當前線程id(判斷是否可重入鎖的關鍵) long threadId = Thread.currentThread().getId(); // 【核心點1】嘗試獲取鎖,若返回值為null,則表示已獲取到鎖,返回的ttl就是key的剩余存活時間 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } // 還可以容忍的等待時長 = 獲取鎖能容忍的最大等待時長 - 執行完上述操作流程的時間 time -= System.currentTimeMillis() - current; if (time <= 0) { //等不到了,直接返回失敗 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); /** * 【核心點2】 * 訂閱解鎖消息 redisson_lock__channel:{$KEY},并通過await方法阻塞等待鎖釋放,解決了無效的鎖申請浪費資源的問題: * 基于信息量,當鎖被其它資源占用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的線程進行競爭 * 當 this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗 * 當 this.await返回true,進入循環嘗試獲取鎖 */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //await 方法內部是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果(應用了Netty 的 Future) if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } // ttl 不為空,表示已經有這樣的key了,只能阻塞等待 try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 來個死循環,繼續嘗試著獲取鎖 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } currentTime = System.currentTimeMillis(); /** * 【核心點3】根據鎖TTL,調整阻塞等待時長; * 1、latch其實是個信號量Semaphore,調用其tryAcquire方法會讓當前線程阻塞一段時間,避免在while循環中頻繁請求獲鎖; * 當其他線程釋放了占用的鎖,會廣播解鎖消息,監聽器接收解鎖消息,并釋放信號量,最終會喚醒阻塞在這里的線程 * 2、該Semaphore的release方法,會在訂閱解鎖消息的監聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調用; */ //調用信號量的方法來阻塞線程,時長為鎖等待時間和租期時間中較小的那個 if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { // 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},不再關注解鎖事件 unsubscribe(subscribeFuture, threadId); } }接著看注釋中提到的 3 個核心點
核心點1-嘗試加鎖:RedissonLock.tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // leaseTime != -1 說明沒過期 if (leaseTime != -1) { // 實質是異步執行加鎖Lua腳本 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 否則,已經過期了,傳參變為新的時間(續期后) ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 續期 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }異步執行加鎖 Lua 腳本:RedissonLock.tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // 1.如果緩存中的key不存在,則執行 hincrby 命令(hincrby key UUID+threadId 1), 設值重入次數1 // 然后通過 pexpire 命令設置鎖的過期時間(即鎖的租約時間) // 返回空值 nil ,表示獲取鎖成功 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果key已經存在,并且value也匹配,表示是當前線程持有的鎖,則執行 hincrby 命令,重入次數加1,并且設置失效時間 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果key已經存在,但是value不匹配,說明鎖已經被其他線程持有,通過 pttl 命令獲取鎖的剩余存活時間并返回,至此獲取鎖失敗 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }KEYS[1] 就是 Collections.singletonList(getName()),表示分布式鎖的key;
ARGV[1] 就是internalLockLeaseTime,即鎖的租約時間(持有鎖的有效時間),默認30s;
ARGV[2] 就是getLockName(threadId),是獲取鎖時set的唯一值 value,即UUID+threadId
看門狗續期:RedissonBaseLock.scheduleExpirationRenewal
核心點2-訂閱解鎖消息:RedissonLock.subscribe
protected final LockPubSub pubSub; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); //在構造器中初始化pubSub,跟著這幾個get方法會發現他們都是在構造器中初始化的,在PublishSubscribeService中會有 // private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; 這樣一段代碼,初始化了一組信號量 this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } protected RFuture<RedissonLockEntry> subscribe(long threadId) { return pubSub.subscribe(getEntryName(), getChannelName()); } // 在LockPubSub中注冊一個entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry實例中存放著RPromise<RedissonLockEntry>結果,一個信號量形式的鎖和訂閱方法重入計數器 public RFuture<E> subscribe(String entryName, String channelName) { AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); RPromise<E> newPromise = new RedissonPromise<>(); semaphore.acquire(() -> { if (!newPromise.setUncancellable()) { semaphore.release(); return; } E entry = entries.get(entryName); if (entry != null) { entry.acquire(); semaphore.release(); entry.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } E value = createEntry(newPromise); value.acquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } RedisPubSubListener<Object> listener = createListener(channelName, value); service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); }); return newPromise; }核心點 3 比較簡單,就不說了
解鎖
RedissonLock.unlock()
RedissonBaseLock.unlockAsync
@Override public RFuture<Void> unlockAsync(long threadId) { // 構建一個結果RedissonPromise RPromise<Void> result = new RedissonPromise<>(); // 返回的RFuture如果持有的結果為true,說明解鎖成功,返回NULL說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個線程 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消看門狗的續期任務 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }RedissonLock.unlockInnerAsync
// 真正的內部解鎖的方法,執行解鎖的Lua腳本 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //如果分布式鎖存在,但是value不匹配,表示鎖已經被其他線程占用,無權釋放鎖,那么直接返回空值(解鈴還須系鈴人) "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + //如果value匹配,則就是當前線程占有分布式鎖,那么將重入次數減1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //重入次數減1后的值如果大于0,表示分布式鎖有重入過,那么只能更新失效時間,還不能刪除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + //重入次數減1后的值如果為0,這時就可以刪除這個KEY,并發布解鎖消息,返回1 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", //這5個參數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }我只列出了一小部分代碼,更多的內容還是得自己動手
從源碼中,我們可以看到 Redisson 幫我們解決了拋出的第一個問題:失效時間設置多長時間為好?
Redisson 提供了看門狗,每獲得一個鎖時,只設置一個很短的超時時間,同時起一個線程在每次快要到超時時間時去刷新鎖的超時時間。在釋放鎖的同時結束這個線程。
但是沒有解決節點掛掉,丟失鎖的問題,接著來~
四、RedLock
我們上邊介紹的分布式鎖,在某些極端情況下仍然是有缺陷的
客戶端長時間內阻塞導致鎖失效
客戶端 1 得到了鎖,因為網絡問題或者 GC 等原因導致長時間阻塞,然后業務程序還沒執行完鎖就過期了,這時候客戶端 2 也能正常拿到鎖,可能會導致線程安全的問題。
Redis 服務器時鐘漂移
如果 Redis 服務器的機器時間發生了向前跳躍,就會導致這個 key 過早超時失效,比如說客戶端 1 拿到鎖后,key 還沒有到過期時間,但是 Redis 服務器的時間比客戶端快了 2 分鐘,導致 key 提前就失效了,這時候,如果客戶端 1 還沒有釋放鎖的話,就可能導致多個客戶端同時持有同一把鎖的問題。
單點實例安全問題
如果 Redis 是單機模式的,如果掛了的話,那所有的客戶端都獲取不到鎖了,假設你是主從模式,但 Redis 的主從同步是異步進行的,如果 Redis 主宕機了,這個時候從機并沒有同步到這一把鎖,那么機器 B 再次申請的時候就會再次申請到這把鎖,這也是問題
為了解決這些個問題 Redis 作者提出了 RedLock 紅鎖的算法,在 Redission 中也對 RedLock 進行了實現。
Redis 官網對 redLock 算法的介紹大致如下:The Redlock algorithm
在分布式版本的算法里我們假設我們有 N 個 Redis master 節點,這些節點都是完全獨立的,我們不用任何復制或者其他隱含的分布式協調機制。之前我們已經描述了在 Redis 單實例下怎么安全地獲取和釋放鎖。我們確保將在每(N) 個實例上使用此方法獲取和釋放鎖。在我們的例子里面我們設置 N=5,這是一個比較合理的設置,所以我們需要在 5 臺機器或者虛擬機上面運行這些實例,這樣保證他們不會同時都宕掉。為了取到鎖,客戶端應該執行以下操作:
獲取當前 Unix 時間,以毫秒為單位。
依次嘗試從 5 個實例,使用相同的 key 和具有唯一性的 value(例如UUID)獲取鎖。當向 Redis 請求獲取鎖時,客戶端應該設置一個嘗試從某個 Reids 實例獲取鎖的最大等待時間(超過這個時間,則立馬詢問下一個實例),這個超時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為 10 秒,則超時時間應該在 5-50 毫秒之間。這樣可以避免服務器端 Redis 已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個 Redis 實例請求獲取鎖。
客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖消耗的時間。當且僅當從大多數(N/2+1,這里是3個節點)的 Redis 節點都取到鎖,并且使用的總耗時小于鎖失效時間時,鎖才算獲取成功。
如果取到了鎖,key 的真正有效時間 = 有效時間(獲取鎖時設置的 key 的自動超時時間) - 獲取鎖的總耗時(詢問各個 Redis 實例的總耗時之和)(步驟 3 計算的結果)。
如果因為某些原因,最終獲取鎖失敗(即沒有在至少 “N/2+1 ”個 Redis 實例取到鎖或者“獲取鎖的總耗時”超過了“有效時間”),客戶端應該在所有的 Redis 實例上進行解鎖(即便某些 Redis 實例根本就沒有加鎖成功,這樣可以防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。
總結下就是:
客戶端在多個 Redis 實例上申請加鎖,必須保證大多數節點加鎖成功
解決容錯性問題,部分實例異常,剩下的還能加鎖成功
大多數節點加鎖的總耗時,要小于鎖設置的過期時間
多實例操作,可能存在網絡延遲、丟包、超時等問題,所以就算是大多數節點加鎖成功,如果加鎖的累積耗時超過了鎖的過期時間,那有些節點上的鎖可能也已經失效了,還是沒有意義的
釋放鎖,要向全部節點發起釋放鎖請求
如果部分節點加鎖成功,但最后由于異常導致大部分節點沒加鎖成功,就要釋放掉所有的,各節點要保持一致
關于 RedLock,兩位分布式大佬,Antirez 和 Martin 還進行過一場爭論,感興趣的也可以看看
Config config1 = new Config(); config1.useSingleServer().setAddress("127.0.0.1:6379"); RedissonClient redissonClient1 = Redisson.create(config1); Config config2 = new Config(); config2.useSingleServer().setAddress("127.0.0.1:5378"); RedissonClient redissonClient2 = Redisson.create(config2); Config config3 = new Config(); config3.useSingleServer().setAddress("127.0.0.1:5379"); RedissonClient redissonClient3 = Redisson.create(config3); /** * 獲取多個 RLock 對象 */ RLock lock1 = redissonClient1.getLock(lockKey); RLock lock2 = redissonClient2.getLock(lockKey); RLock lock3 = redissonClient3.getLock(lockKey); /** * 根據多個 RLock 對象構建 RedissonRedLock (最核心的差別就在這里) */ RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); try { /** * 4.嘗試獲取鎖 * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗 * leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大于業務處理的時間,確保在鎖有效期內業務能處理完) */ boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { //成功獲得鎖,在這里處理業務 } } catch (Exception e) { throw new RuntimeException("aquire lock fail"); }finally{ //無論如何, 最后都要解鎖 redLock.unlock(); }最核心的變化就是需要構建多個 RLock ,然后根據多個 RLock 構建成一個 RedissonRedLock,因為 redLock 算法是建立在多個互相獨立的 Redis 環境之上的(為了區分可以叫為 Redission node),Redission node 節點既可以是單機模式(single),也可以是主從模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。這就意味著,不能跟以往這樣只搭建 1個 cluster、或 1個 sentinel 集群,或是1套主從架構就了事了,需要為 RedissonRedLock 額外搭建多幾套獨立的 Redission 節點。
RedissonMultiLock.tryLock
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // try { // return tryLockAsync(waitTime, leaseTime, unit).get(); // } catch (ExecutionException e) { // throw new IllegalStateException(e); // } long newLeaseTime = -1; if (leaseTime != -1) { if (waitTime == -1) { newLeaseTime = unit.toMillis(leaseTime); } else { newLeaseTime = unit.toMillis(waitTime)*2; } } long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime); //允許加鎖失敗節點個數限制(N-(N/2+1)) int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<>(locks.size()); // 遍歷所有節點通過EVAL命令執行lua加鎖 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { // 對節點嘗試加鎖 if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { // 如果拋出這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖所有節點 unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { lockAcquired = false; } if (lockAcquired) { acquiredLocks.add(lock); } else { /* * 計算已經申請鎖失敗的節點是否已經到達 允許加鎖失敗節點個數限制 (N-(N/2+1)) * 如果已經到達, 就認定最終申請鎖失敗,則沒有必要繼續從后面的節點申請了 * 因為 Redlock 算法要求至少N/2+1 個節點都加鎖成功,才算最終的鎖申請成功 */ if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } } //計算 目前從各個節點獲取鎖已經消耗的總時間,如果已經等于最大等待時間,則認定最終申請鎖失敗,返回false if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { acquiredLocks.stream() .map(l -> (RedissonLock) l) .map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)) .forEach(f -> f.syncUninterruptibly()); } return true; }參考與感謝
《Redis —— Distributed locks with Redis》
《Redisson —— Distributed locks and synchronizers》
慢談 Redis 實現分布式鎖 以及 Redisson 源碼解析
理解Redisson中分布式鎖的實現
Remi醬加yooooo~
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的聊聊分布式锁——Redis和Redisson的方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一文带你深入理解JVM内存模型
- 下一篇: 怎么通俗的理解Netty呢?