年轻人,看看Redisson分布式锁—可重入锁吧!太重要了
1.引言
作為后端開發(fā),對(duì)于所謂的線程安全、高并發(fā)等一系列名詞肯定都不會(huì)陌生,相關(guān)的一些概念及技術(shù)框架是面試中的寵兒,也是工作中解決一些特定場(chǎng)景下的技術(shù)問題的銀彈。今天我們就來聊聊這些銀彈中的其中一枚——分布式鎖,更確切的說是分布式鎖的其中一種輪子:Redisson 的可重入鎖——基于 redis 實(shí)現(xiàn)的分布式鎖。
俗話說得好:面試造火箭,工作擰螺絲(手動(dòng)狗頭)。分布式鎖大家應(yīng)該也都不陌生,在解決譬如分布式服務(wù)下庫存超賣等類似的場(chǎng)景下很常見,大家也應(yīng)該都學(xué)習(xí)和使用過相關(guān)的框架,或者自己實(shí)現(xiàn)過類似的分布式鎖。但一個(gè)完備的分布式鎖實(shí)現(xiàn)應(yīng)該考慮哪些問題,如何優(yōu)雅且全面的實(shí)現(xiàn)一個(gè)分布式鎖,以及這些實(shí)現(xiàn)背后的原理,這些可能才是我們需要考慮的。就我而言,自己造不了火箭,但學(xué)習(xí)一下火箭是怎么造的還是蠻有意思的,說不定哪天螺絲擰多了需要自己造火箭呢。退一步講,火箭造出來的過程也是一個(gè)個(gè)螺絲擰出來的嘛。只要屠龍刀在手,一刀 999,現(xiàn)在能殺雞,將來也能宰牛, skr skr ~
2.分布式鎖概覽
2.1 鎖是干嘛的?
談到鎖,第一印象就是 Java 本身支持的一些加鎖,不管是是 synchronized 還是并發(fā)包下 Lock 的一些實(shí)現(xiàn)如 ReentrantLock,更甚一些無鎖機(jī)制譬如并發(fā)包下的一些原子類如 AtomicInteger,都在不同粒度上保證了線程安全。而所謂的線程實(shí)現(xiàn),歸根到底也就是保證共享區(qū)或者共享變量的安全而已,而聊到線程安全,立馬就能聯(lián)想到三個(gè)特性:
原子性
可見性
有序性
對(duì)于 Java 語言本身的一些保證線程安全的實(shí)現(xiàn)如 synchronized、Lock、原子類更甚至一些在此基礎(chǔ)上的一些線程安全的集合是如何在不同粒度上保證原子性、可見性、有序性的這里就不拆開了,也不是本篇要討論的。更為重要的,我們回到共享資源這個(gè)概念,對(duì)于單點(diǎn)下的應(yīng)用,為什么一提到線程安全就是三大特性,為什么這三個(gè)特性可以保證線程安全。我的通俗理解是:
對(duì)于共享資源的加鎖或者修改(原子類)是原子的,不可拆分的,那么加鎖或者修改本身就要么成功要么失敗,不會(huì)有中間過程的誤判
而可見性是保證共享資源對(duì)所有線程可見,這個(gè)也沒什么好解釋的,只有對(duì)共享資源的任何修改都可感知,才不會(huì)在不同線程下決策不同
有序性的前提是多線程,單線程下的指令重排不會(huì)改變串行結(jié)果,但多線程下的指令重排下對(duì)共享區(qū)域的修改會(huì)相互干擾,所以保證多線程的有序性也是必須的
加鎖是手段,保證共享資源的安全才是目的,單點(diǎn)下 Java 是通過原子性、可見性、有序性來實(shí)現(xiàn)的。
2.2 分布式鎖需要考慮什么?
前面我們廢話了一堆,可以看出來鎖的目的:保證共享資源的安全。現(xiàn)在不考慮單點(diǎn)鎖還是分布式鎖:我們考慮兩個(gè)問題:
共享資源在哪里?
是否保證了原子性、有序性、可見性加鎖就一定是完備的
對(duì)于第一個(gè)問題,在單點(diǎn)情況下,我們可以共享資源是一個(gè)實(shí)例變量或者是一段對(duì)資源進(jìn)行操作的代碼,這些資源在不同線程下共享,而這里的線程都是在一個(gè) JVM 進(jìn)程中。那么如果是分布式系統(tǒng)呢?舉個(gè)例子:某個(gè)商品的總庫存、某個(gè)優(yōu)惠券批次的總數(shù)量,這些是共享資源嗎?當(dāng)然是,只是這里共享這些資源的對(duì)象從一個(gè) JVM 進(jìn)程下的多個(gè)線程變成了多個(gè)服務(wù)節(jié)點(diǎn)下的多個(gè) JVM 進(jìn)程下的多個(gè)線程而已。下面通過兩張圖我們可以對(duì)比一下:
1.單進(jìn)程下共享資源
2.分布式系統(tǒng)下共享資源
可以看出來,在單個(gè) JVM 進(jìn)程中,共享資源只是在同一進(jìn)程下的不同線程共享,不管共享資源是實(shí)例變量、代碼段、或者數(shù)據(jù)庫的資源,所以我們可以通過單點(diǎn)下的原子性、有序性、可見性來保證共享資源的安全。
而在分布式系統(tǒng)下,共享資源的范圍就擴(kuò)大到了多臺(tái)機(jī)器下的多個(gè)進(jìn)程中的多個(gè)線程中。那么再看一下第二個(gè)問題,在分布式系統(tǒng)下原子性、有序性、可見性還管用嗎?或者說這三個(gè)特性在分布式系統(tǒng)下還有用嗎?我的理解是:這三個(gè)特性是依然存在的,只是針對(duì)的對(duì)象和范圍發(fā)生了變化。在單點(diǎn)情況下,任何共享資源都共存于同一個(gè) JVM 進(jìn)程中,共享資源狀態(tài)同步的范圍也只是在線程的工作內(nèi)存和主內(nèi)存之間而已,或者說共享資源的最終狀態(tài)在主內(nèi)存,而其變化狀態(tài)發(fā)生在單點(diǎn)下的多線程的各自工作內(nèi)存中,這三個(gè)特性所在的容器也只是單個(gè) JVM 進(jìn)程而已。而分布式系統(tǒng)下,共享資源的狀態(tài)同步范圍擴(kuò)大了多臺(tái)機(jī)器各自的進(jìn)程(更細(xì)致一點(diǎn)是各個(gè)進(jìn)程中不同的線程之間),共享資源的最終狀態(tài)最終一定要依賴于 JVM 進(jìn)程外的第三方,比如數(shù)據(jù)庫、任意形式的服務(wù)器等等,而共享資源的狀態(tài)變化發(fā)生在多個(gè)進(jìn)程下的多個(gè)線程,因此分布式下的共享資源的安全保證,不僅僅是在線程之間,也在進(jìn)程之間。
2.3 分布式鎖要提供的最基礎(chǔ)的能力
當(dāng)然,前面一段理解可能有點(diǎn)過于冗繁,也可以說:分布式系統(tǒng)下整個(gè)服務(wù)集群是一個(gè)大容器,狀態(tài)的同步范圍在集群服務(wù)所有的線程之間,只是這些線程的交互不再只是通過單機(jī)的緩存一致性協(xié)議(如 MESI 協(xié)議等),而是擴(kuò)大到了端到端的通信即網(wǎng)絡(luò)交互,而共享資源的直接宿主也在第三方譬如其他服務(wù)、數(shù)據(jù)庫等。那這時(shí)候這三個(gè)特性的范圍如果也相應(yīng)的擴(kuò)大到集群線程之間,那共享資源的安全自然也是能夠保證的。當(dāng)然,這么說可能不太嚴(yán)謹(jǐn),因?yàn)槲乙矝]在相關(guān)的資料上看到過有人在分布式系統(tǒng)之間使用原子性、有序性、可見性來說明分布式系統(tǒng)的多線程安全,這是只是借鑒思想,大家如果感覺名詞不夠?qū)I(yè),輕噴。
前面簡(jiǎn)單討論了分布式系統(tǒng)下的共享資源以及保證線程安全的三個(gè)特性,我們考慮一下如何才能在分布式系統(tǒng)這個(gè)大容器下保證這三個(gè)特性,或者說如何在分布式系統(tǒng)下加鎖?首先,鎖的共享范圍必然是要和要保護(hù)的資源一致的,在單點(diǎn)下共享資源就在單個(gè) JVM 進(jìn)程中,那么鎖依靠 JVM 中的一些手段也就足夠了,比如 synchronized、Lock、原子類(當(dāng)然這個(gè)是無鎖的)等。而在分布式系統(tǒng)下,鎖的生存范圍必然是和集群節(jié)點(diǎn)平級(jí)的,要不然各個(gè)節(jié)點(diǎn)各自用自己的鎖,大家對(duì)于對(duì)方的鎖根本不認(rèn)識(shí)也無法交流那豈不是亂掉了。所以分布式鎖必須獨(dú)立于各個(gè)節(jié)點(diǎn)之外,比如借助 redis、zookeeper 等實(shí)現(xiàn),當(dāng)然,本篇我們討論的是 redis 的分布式鎖,但我認(rèn)為前面的思想是通用的,哪怕不用 redis、zookeeper 也可以實(shí)現(xiàn),只是實(shí)現(xiàn)方式、效率等方面有所差異。即分布式鎖最起碼要實(shí)現(xiàn)進(jìn)程間共享(這里的共享是指在不同進(jìn)程間是一套,而不是說可以同時(shí)持有),并且能夠保證共享資源的原子性、有序性、可見性。
這里多說一點(diǎn),由于分布式鎖宿主在 JVM 進(jìn)程之間,各個(gè)進(jìn)程加鎖以及同步是通過端到端的進(jìn)程通信,那么此時(shí)分布式系統(tǒng)下的可見性、有序性是自然滿足的。首先可見性很好理解,因?yàn)楣蚕碣Y源的獲取本身就是服務(wù)與服務(wù)間的通信,可見性的粒度也應(yīng)該在服務(wù),只要共享資源發(fā)生改變,任何一個(gè)服務(wù)都可以查詢到(不要說事務(wù)什么的,我覺得這里共享資源的狀態(tài)同步應(yīng)該是在事務(wù)的上層來看)。而有序性也是在分布式鎖的前提下,不同服務(wù)之間對(duì)于共享資源的變更也變成了時(shí)間上是串行的,那么也自然滿足的,當(dāng)然這里會(huì)有性能的犧牲。那么原子性呢?我理解這里的原子性是靠分布式鎖的獲取等來保證的,只要加鎖、釋放等是原子的,那么鎖所保護(hù)的資源(或操作)對(duì)于同級(jí)的操作就是一個(gè)原子的。
2.4 分布式鎖還要考慮什么?
前面討論了分布式鎖怎么保證共享資源的安全,但是由于分布式鎖宿主在譬如 redis、zookeeper 等中間件中,加鎖、釋放、鎖續(xù)期等也是在進(jìn)程與 redis 之間通信,那么就引出了一些單點(diǎn)加鎖不存在的問題:那就是服務(wù)如果宕機(jī)了怎么辦?或者加鎖是有時(shí)間的,如果時(shí)間過了持有鎖的任務(wù)還沒有完成怎么辦?這時(shí)候看起來就像下圖可能出現(xiàn)的情況
出現(xiàn)這些問題的原因是雖然我們將分布式系統(tǒng)和鎖的宿主看作一個(gè)大的通信系統(tǒng),但其卻是離散的,離散的節(jié)點(diǎn)自身可能存活、死亡等,在單個(gè)離散節(jié)點(diǎn)不存在時(shí),其持有的鎖卻可能仍在另外一個(gè)離散節(jié)點(diǎn)存在(這里指的是依靠 redis 實(shí)現(xiàn)的分布式鎖),那么對(duì)于其他節(jié)點(diǎn)來說鎖也就永遠(yuǎn)無法獲取了。反過來,如果持有鎖的離散的服務(wù)節(jié)點(diǎn)對(duì)于共享資源的操作還沒有完成,Redis 由于鎖的時(shí)間到期而釋放鎖,那么其他的服務(wù)節(jié)點(diǎn)就可以獲取到本不該獲取的鎖了,這時(shí)候共享資源必然是不安全的。而這些在單個(gè)進(jìn)程中的鎖不會(huì)存在,因?yàn)閱芜M(jìn)程下的鎖、線程、資源都在一個(gè)容器即 JVM 進(jìn)程中,JVM 進(jìn)程死掉的話這些也就一起死掉了,自然也不會(huì)存在之前說的問題。可見,分布式鎖不僅要維護(hù)共享資源的安全,還要維護(hù)鎖自身在不同進(jìn)程下的安全問題。
3. redis 分布式鎖的一種實(shí)現(xiàn)—— Redisson 的可重入鎖
3.1 如何使用 Redisson 的分布式鎖
寫到這里,我覺得前面的文字鋪墊的太多了,代碼和圖片太少了,但對(duì)我個(gè)人而言我覺得會(huì)使用分布式鎖沒有什么太大的意義,所以我前面還是堅(jiān)持寫了一些冗繁的廢話。那么,我們先看一下如何最簡(jiǎn)單的使用 Redisson 的分布式鎖吧,畢竟 Talk is cheap,show me the code !
@Autowired private RedissonClient redisson;private static final String LOCK_PREFIX = "my:lock:";@GetMapping("redis/lock/{seq}") public String lock(@PathVariable String seq) {RLock lock = redisson.getLock(LOCK_PREFIX + seq);try {boolean lockSuccess = lock.tryLock(5, TimeUnit.SECONDS);if (lockSuccess) {System.out.println("get lock success");} else {System.out.println("get lock fail");}TimeUnit.SECONDS.sleep(15);} catch (InterruptedException e) {e.printStackTrace();return seq + " mission dead";} finally {lock.unlock();}return seq + " mission completed"; }@Bean public RedissonClient redissonClient() {Config config = new Config();// 單點(diǎn)模式config.useSingleServer().setAddress("redis://127.0.0.1:6379");// 集群模式/*config.useClusterServers().addNodeAddress("redis://127.0.0.1:7000").addNodeAddress("redis://127.0.0.1:7001").addNodeAddress("redis://127.0.0.1:7002").addNodeAddress("redis://127.0.0.1:7003").addNodeAddress("redis://127.0.0.1:7004").addNodeAddress("redis://127.0.0.1:7005");*/return Redisson.create(config); }Redisson 實(shí)現(xiàn)的分布式鎖的使用就是這么簡(jiǎn)單,這個(gè)也沒什么好說的,我們公司的不少服務(wù)應(yīng)該也都有過使用,就我接觸到的有兌換券、優(yōu)惠券等。下面我們就基于這段簡(jiǎn)單的代碼來理解一下 Redisson 的分布式鎖是如何實(shí)現(xiàn)的。
3.1 RedissonClient:同 redis 通信的組件
public class Redisson implements RedissonClient {static {RedissonObjectFactory.warmUp();RedissonReference.warmUp();}protected final QueueTransferService queueTransferService = new QueueTransferService();protected final EvictionScheduler evictionScheduler;protected final ConnectionManager connectionManager;protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();protected final Config config;protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();protected final ConcurrentMap<String, ResponseEntry> responses = PlatformDependent.newConcurrentHashMap();protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);connectionManager = ConfigSupport.createConnectionManager(configCopy);evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());}public EvictionScheduler getEvictionScheduler() {return evictionScheduler;}public CommandExecutor getCommandExecutor() {return connectionManager.getCommandExecutor();}public ConnectionManager getConnectionManager() {return connectionManager;}/*** Create sync/async Redisson instance with default config** @return Redisson instance*/public static RedissonClient create() {Config config = new Config();config.useSingleServer().setTimeout(1000000).setAddress("redis://127.0.0.1:6379"); // config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399"); // config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379"); // config.useClusterServers().addNodeAddress("127.0.0.1:7000");return create(config);}/*** Create sync/async Redisson instance with provided config** @param config for Redisson* @return Redisson instance*/public static RedissonClient create(Config config) {Redisson redisson = new Redisson(config);if (config.isReferenceEnabled()) {redisson.enableRedissonReferenceSupport();}return redisson;}@Overridepublic RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);}// 省略巴拉巴拉 }不得不說,這段代碼復(fù)制粘貼的是有點(diǎn)臭長(zhǎng)啊,畢竟 CV 工程師,哈哈。總結(jié)起來就是一句話:Redisson 類是 RedissonClient 的實(shí)現(xiàn),封裝了一些配置、同 redis 的連接管理、一些定時(shí)任務(wù)、發(fā)布訂閱組件等,另外提供一些獲取 Redisson 基于 Redis 實(shí)現(xiàn)的分布式鎖、分布式集合、分布式信號(hào)量等接口方法,比如我們的分布式鎖-可重入鎖。
public RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name); }而這里實(shí)際上我們獲取到的只是 Redisson 封裝好的對(duì)分布式鎖的抽象的對(duì)象而已,并不是真正的就執(zhí)行加鎖操作了。而加鎖、釋放鎖等就是基于 Redisson 的鎖接口 RLock 來做的,而本文討論的可重入鎖則 RedissonLock 是其中一種實(shí)現(xiàn)。
3.2 RedissonLock 是如何加鎖的
下面我們就以 demo 的代碼為入口看一下 RedissonLock 是如何加鎖的:
@Override public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit); }通過 demo 中使用的 tryLock(long waitTime, TimeUnit unit) 我們可以看出來,真正調(diào)用的是下面這個(gè)方法:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 加鎖成功 返回null 否則返回的是該鎖將要過期的剩余時(shí)間// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;// 未獲取到鎖,且第一次嘗試獲取鎖花費(fèi)時(shí)間超過了預(yù)設(shè)等待時(shí)間,則獲取鎖失敗,不再等待if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);} // return get(tryLockAsync(waitTime, leaseTime, unit)); }而關(guān)于這段代碼呢,我們先忽略其他邏輯,重點(diǎn)看這一行:
Long ttl = tryAcquire(leaseTime, unit, threadId);這一行第一次嘗試加鎖,接著往下看:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId)); }private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 如果自己設(shè)置了鎖釋放時(shí)間,則獲取鎖后直接返回,且不會(huì)設(shè)置定時(shí)刷新的邏輯(上層方法沒有設(shè)置定時(shí)任務(wù)),則獲取到鎖后超過設(shè)定的事件后自動(dòng)釋放// 或者在設(shè)定時(shí)間內(nèi)手動(dòng)調(diào)用釋放鎖if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {// 未獲取到鎖if (e != null) {return;}// 獲取到鎖,開啟自動(dòng)延期鎖的定時(shí)任務(wù)// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture; }可以看出來對(duì)于 leaseTime != -1 的判斷會(huì)走兩種方式:真正的加鎖是通過 tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) 這個(gè)方法來做的,而當(dāng) leaseTime != -1 時(shí),直接返回加鎖結(jié)果了,而當(dāng) leaseTime = -1 時(shí),在返回加鎖結(jié)果之前,會(huì)監(jiān)聽加鎖的結(jié)果:如果加鎖成功了還會(huì)開啟一個(gè)自動(dòng)延期鎖的定時(shí)任務(wù)。而這個(gè) leaseTime 指的就是加鎖成功后鎖的默認(rèn)持有時(shí)間。當(dāng)我們不指定 leaseTime 時(shí),默認(rèn)的鎖持有時(shí)間是 30 秒(這個(gè)時(shí)間叫作看門狗 - lockWatchdogTimeout),并且每 10 秒(30/3)去確認(rèn)一下鎖的狀態(tài):如果鎖仍未被釋放,則重新設(shè)置鎖的過期時(shí)間為 30 秒(當(dāng)然,持有鎖的服務(wù)宕機(jī)后在 30 秒后鎖會(huì)自動(dòng)釋放,這個(gè)我們后面再說)。而當(dāng)我們指定 leaseTime 時(shí),我們可以看出來前面的代碼不會(huì)走到定時(shí)續(xù)期鎖的邏輯,這時(shí)表示:成功獲取到鎖后,在 leaseTime 后,如果鎖仍沒有被服務(wù)主動(dòng)釋放,鎖將自動(dòng)過期,而不會(huì)管持有鎖的線程有沒有完成對(duì)應(yīng)的操作,相當(dāng)于在持有所得服務(wù)執(zhí)行了比較耗時(shí)的任務(wù)且未完成時(shí),這時(shí)鎖已經(jīng)被釋放,這時(shí)候自然也是不安全的。上面兩段代碼的流程如下:
從前面的流程圖我們可以看出,RedissonLock.tryLock(long waitTime, long leaseTime, TimeUnit unit) 是對(duì)于 waitTime, leaseTime 入?yún)?huì)產(chǎn)生不同的行為,這也是 RedissonLock 嘗試加鎖相對(duì)最完整的一個(gè)鏈路,其他方法譬如我們直接使用的 tryLock(long waitTime, TimeUnit unit) 也只是復(fù)用了其中一個(gè)邏輯分支。
3.3 RedissonLock分布式鎖的數(shù)據(jù)結(jié)構(gòu)與加鎖原理
前面一小節(jié)我們看到了 RedissonLock 完整的加鎖鏈路,那么分布式鎖在 Redis 是如何實(shí)現(xiàn)的呢?怎么判斷加鎖失敗以及鎖的剩余時(shí)間呢?現(xiàn)在我們就來看看這個(gè)。
通過前面的代碼我們可以看出來真正執(zhí)行加鎖以及返回加鎖結(jié)果是調(diào)用了下面的方法:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);// 鎖不存在,加鎖成功,設(shè)置hash數(shù)據(jù)結(jié)構(gòu)鎖: 鎖名 -> 加鎖線程:id -> 加鎖次數(shù)(1)// 鎖存在且是本線程的鎖 加鎖次數(shù)增加:鎖名 -> 加鎖線程:id -> 加鎖次數(shù)+1// 鎖存在且不是本線程的鎖 加鎖失敗 返回鎖剩余過期時(shí)間return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }到了這里,我們才能看到 RedissonLock 的加鎖(僅僅指執(zhí)行加鎖這一動(dòng)作)以及鎖在 Redis 中的數(shù)據(jù)結(jié)構(gòu)的廬山真面目:可以看出來上面是執(zhí)行了一段 lua 腳本,這段 lua 腳 本是會(huì)涉及到多個(gè)判斷以及數(shù)據(jù)修改的,這個(gè)時(shí)候就可以回到我們說的關(guān)于加鎖的原子性問題了。先不看這段加鎖的邏輯,只考慮加鎖過程涉及到多個(gè)判斷以及操作時(shí),那么那些動(dòng)作必須是原子的,要么同時(shí)成功要么同時(shí)失敗,而 RedissonLock 實(shí)現(xiàn)加鎖過程的原子性就是借助了 lua 腳本(鎖延期等也會(huì)使用 lua 腳本)。那么我們看一下這段 lua 腳本的含義吧:結(jié)合注釋,Redisson 實(shí)現(xiàn)的可重入鎖的數(shù)據(jù)結(jié)構(gòu)使用了 Redis中 的 hash 對(duì)象數(shù)據(jù)類型來實(shí)現(xiàn),其在 Redis 中大概長(zhǎng)這個(gè)樣子:
從上面這張圖我們可以看出來 Redisson 的分布式鎖在 Redis 中的 hash 數(shù)據(jù)結(jié)構(gòu):{鎖名}:{uuid:threadId}:{count},另外對(duì)于已經(jīng)存在的健值對(duì)初始化過期時(shí)間為 30 秒。結(jié)合前面的加鎖流程圖,我們就可以看出來 Redisson 分布式鎖是如何實(shí)現(xiàn)加鎖的原子性,以下操作是一個(gè)原子操作:某一個(gè)節(jié)點(diǎn)下的線程加鎖首先判斷該線程對(duì)于的 hash 鍵是否存在
若不存在(鎖未被持有),則將鎖的鍵設(shè)置為線程 id 對(duì)應(yīng)的唯一標(biāo)識(shí),值為 1 (第一次加鎖),返回空表示加鎖成功
鎖存在且對(duì)應(yīng)的是本線程,說明之前加鎖的線程為同一個(gè),則將 hash 值 1 (加鎖次數(shù),可重入),另外將該鎖對(duì)應(yīng)的存活時(shí)間重新設(shè)置,返回空表示加鎖成功
鎖存在但鍵對(duì)應(yīng)的不是當(dāng)前線程,說明持有鎖的是其他線程,返回鎖剩余的過期時(shí)間表示加鎖失敗
到這里,Redisson 的分布式鎖加鎖的流程以及鎖在 Redis 中的數(shù)據(jù)結(jié)構(gòu)已經(jīng)清楚了,這時(shí)候我們可以對(duì)比一下 Java 自身實(shí)現(xiàn)的可重入鎖 ReentrantLock。對(duì)于 ReentrantLock,甚至更多的線程安全組件如 Semaphore、CountDownLatch 等,其底層的實(shí)現(xiàn)都依賴于 AQS(AbstractQueuedSynchronizer),而 AQS 本身是一個(gè)隊(duì)列,隊(duì)列中的節(jié)點(diǎn) Node 同樣也是封裝了線程的對(duì)象,只是 AQS 是本地單節(jié)點(diǎn)的,Redis 卻是分布式的可以被任何 JVM 共享。另外 AQS 中還封裝了一個(gè) int 類型的狀態(tài)變量 state:
/*** The synchronization state.*/ private volatile int state;當(dāng)涉及到具體的實(shí)現(xiàn)時(shí),state 有不同的含義,對(duì) ReentrantLock 來說 state 就是可重入鎖的加鎖次數(shù),對(duì) Semaphore 來說 state 就是信號(hào)量,對(duì) CountDownLatch 來說就是計(jì)數(shù)量。可以看出來, Java 的 AQS 一些抽象和 Redisson 實(shí)現(xiàn)的分布式鎖是可以類比的,比如 thread 標(biāo)識(shí)對(duì)應(yīng)的封裝,加鎖次數(shù)等。只是 AQS 的實(shí)現(xiàn)原子操作一般是基于原子類的 CAS,而 Redisson 實(shí)現(xiàn)原子操作是基于 Redis 的 lua 腳本。另外 AQS 實(shí)現(xiàn)隊(duì)列節(jié)點(diǎn)狀態(tài)同步是基于隊(duì)列本身可以遍歷的特性以及節(jié)點(diǎn)中的幾種狀態(tài)(這里不再贅述),而 Redisson 不同線程之間阻塞同步是基于發(fā)布訂閱(后面會(huì)提到)。可以得出:本地鎖和分布式鎖很多概念和思想是相似的,甚至其數(shù)據(jù)結(jié)構(gòu)以及目標(biāo)都是可類比的,只是分布式鎖對(duì)本地鎖的對(duì)象、范圍、通信方式基于服務(wù)之間通信進(jìn)行了實(shí)現(xiàn)。關(guān)于 AQS 的原理這里不再展開,大家可以參考 JDK 的源碼。
3.3 鎖的自動(dòng)續(xù)期
前面我們從 Redisson 加鎖為入口,分析了加鎖的整體流程并詳細(xì)看了加鎖時(shí)的細(xì)節(jié)以及數(shù)據(jù)結(jié)構(gòu),現(xiàn)在我們看一下 Redisson 分布式鎖是如何自動(dòng)續(xù)期的。前面我們已經(jīng)提到了當(dāng)?shù)谝淮渭渔i成功時(shí)會(huì)開啟自動(dòng)續(xù)期的定時(shí)任務(wù),對(duì)于的代碼入口即為:
// 獲取到鎖,開啟自動(dòng)延期鎖的定時(shí)任務(wù)// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}繼續(xù)往下看,進(jìn)入如下代碼:
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {// 表示不是第一次加鎖 則加鎖次數(shù)加 1 不會(huì)再開啟續(xù)期鎖 因?yàn)榈谝淮渭渔i時(shí)調(diào)用 scheduleExpirationRenewal(long threadId) 會(huì)進(jìn)入// else 會(huì)開啟 renewExpiration()oldEntry.addThreadId(threadId);} else {// 在加鎖時(shí)第一次調(diào)用 開啟自動(dòng)續(xù)期(定時(shí)重設(shè)鎖的過期時(shí)間)entry.addThreadId(threadId);renewExpiration();} }ExpirationEntry 封裝了定時(shí)任務(wù)對(duì)應(yīng)的線程對(duì)象,結(jié)合注釋這一段也不必展開,我們繼續(xù)往下看真正開啟續(xù)期的方法 renewExpiration():
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());// 鎖已經(jīng)不存在了直接返回if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);// 這里監(jiān)聽續(xù)期 成功后遞歸調(diào)用(十秒后再次重復(fù))future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itselfrenewExpiration();}});}// 10 秒續(xù)期一次(如果還持有鎖) 30000/3}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task); }可以看出來鎖自動(dòng)續(xù)期的流程為:
若鎖已經(jīng)不存在了(比如手動(dòng)釋放了鎖),直接返回
若鎖仍存在,調(diào)用 Redis 異步設(shè)置鎖的過期時(shí)間 renewExpirationAsync(threadId),同時(shí)監(jiān)聽續(xù)期結(jié)果
若續(xù)期成功,則遞歸調(diào)用 renewExpiration(),否則異常返回
以上過程每 10 秒重復(fù)一次 (internalLockLeaseTime / 3)
然后我們看一下調(diào)用 Redis 對(duì)鎖進(jìn)行續(xù)期的過程:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {// 當(dāng)前線程持有的鎖還存在 重新設(shè)置鎖的過期時(shí)間(默認(rèn) 30 秒)// 否則失敗return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId)); }這里同樣使用 lua 腳本來執(zhí)行了一段原子操作:
判斷當(dāng)前線程對(duì)應(yīng)的鎖是否存在,若存在則重新設(shè)置鎖的過期時(shí)間(默認(rèn)為 30 秒),返回 true
否則返回 false
3.4 鎖的手動(dòng)釋放
至此,Redisson 的加鎖、自動(dòng)續(xù)期我們已經(jīng)討論過了,現(xiàn)在看一下鎖的手動(dòng)釋放, 其入口為:
public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException)e.getCause();} else {throw e;}}}接著看底層實(shí)現(xiàn) unlockAsync(final long threadId):
public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// 釋放鎖RFuture<Boolean> future = unlockInnerAsync(threadId);// 監(jiān)聽釋放鎖結(jié)果future.onComplete((opStatus, e) -> {// 取消自動(dòng)續(xù)期cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result; }可以看出來,釋放鎖會(huì)執(zhí)行以下操作:
調(diào)用 Redis 釋放鎖
監(jiān)聽釋放鎖結(jié)果,取消自動(dòng)續(xù)期
然后看一下真正釋放鎖的操作:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {// 若鎖不存在 返回// 若鎖存在 加鎖次數(shù) -1// 若加鎖次數(shù)仍不等于 0 (可重入),重新設(shè)置鎖的過期時(shí)間,返回// 若加鎖次數(shù)減為 0,刪除鎖,同步發(fā)布釋放鎖事件,返回return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }上面這段 lua 腳本的含義基本與注釋一致,這里不再贅述。至此,鎖會(huì)被原子性的釋放。
3.5 加鎖等待
討論了加鎖成功、鎖自動(dòng)續(xù)期、鎖釋放后,我們?cè)賮砜匆幌录渔i等待。前面加鎖的代碼中,我們可以看到,若制定了加鎖的等待時(shí)間 waitTime 時(shí),若鎖已經(jīng)被占有,加鎖會(huì)失敗并返回鎖剩余的過期時(shí)間,然后循環(huán)嘗試加鎖,對(duì)應(yīng)以下代碼:
current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 等待鎖釋放 循環(huán)獲取鎖while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}在上面的 while 循環(huán)中,我們可以看出來,每次循環(huán)都會(huì)調(diào)用
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);這里就回到了我們之前分析的加鎖流程,不再贅述。整個(gè)加鎖等待流程如下:
如果加鎖成功,返回成功
加鎖失敗,基于發(fā)布訂閱(基于 Semaphore )阻塞,收到鎖釋放消息后繼續(xù)循環(huán),再次嘗試加鎖
如果整個(gè)加鎖嘗試時(shí)間超過了 waitTime 后仍然未搶到鎖,返回加鎖失敗
4.總結(jié)
至此,Redisson 基于 Redis 實(shí)現(xiàn)的分布式鎖的可重入鎖 RedissonLock 的大致原理就分析完了。我們分析了分布式系統(tǒng)下保證共享資源安全的一些必要特性,然后針對(duì) Redisson 實(shí)現(xiàn)的可重入鎖的加鎖、自動(dòng)續(xù)期、鎖釋放、鎖等待的代碼進(jìn)行了分析,整個(gè)過程有所簡(jiǎn)略,只關(guān)注了整體流程。更為細(xì)節(jié)的內(nèi)容如如何和 Redis 進(jìn)行通信、配置管理覆蓋、發(fā)布訂閱如何實(shí)現(xiàn),感興趣的話大家可以自己探索一下。
全文完
以下文章您可能也會(huì)感興趣:
簡(jiǎn)單說說spring的循環(huán)依賴
Mysql redo log 漫游
單元測(cè)試的實(shí)踐之路
可線性化檢查:與 NP 完全問題做斗爭(zhēng)
Java 類型系統(tǒng)從入門到放棄
Webpack 快速上手(下)
Webpack 快速上手(中)
Webpack 快速上手(上)
Airbnb 的 React Native 之路(下)
Airbnb 的 React Native 之路(上)
零基礎(chǔ)玩轉(zhuǎn) Serverless
iOS 開發(fā):深入理解 Xcode 工程結(jié)構(gòu)(一)
三大報(bào)表:財(cái)務(wù)界的通用語言
四維閱讀法 - 我的高效學(xué)習(xí)“秘技”
一個(gè)創(chuàng)業(yè)公司的容器化之路(三) - 容器即未來
一個(gè)創(chuàng)業(yè)公司的容器化之路(二) - 容器化
一個(gè)創(chuàng)業(yè)公司的容器化之路(一) - 容器化之前
樂高式微服務(wù)化改造(下)
樂高式微服務(wù)化改造(上)
我們正在招聘 Java 工程師,歡迎有興趣的同學(xué)投遞簡(jiǎn)歷到 rd-hr@xingren.com 。
總結(jié)
以上是生活随笔為你收集整理的年轻人,看看Redisson分布式锁—可重入锁吧!太重要了的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于Java医院在线挂号预约系统设计实现
- 下一篇: 数智融合,虚实联动——以数字孪生释放数据