生活随笔
收集整理的這篇文章主要介紹了
基于Redis的分布式锁实现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本文轉自
一、分布式鎖概覽
在多線程的環境下,為了保證一個代碼塊在同一時間只能由一個線程訪問,Java中我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。但是現在公司都是流行分布式架構,在分布式環境下,如何保證不同節點的線程同步執行呢?實際上,對于分布式場景,我們可以使用分布式鎖,它是控制分布式系統之間互斥訪問共享資源的一種方式。 比如說在一個分布式系統中,多臺機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,如果沒有分布式鎖機制保證,那么那多臺機器上的多個服務可能進行并發插入操作,導致數據重復插入,對于某些不允許有多余數據的業務來說,這就會造成問題。而分布式鎖機制就是為了解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占了分布式鎖,其他服務沒獲取到鎖,就不進行后續操作。大致意思如下圖所示(不一定準確):
二、分布式鎖完善過程
現在很多服務都是以微服務集群的方式運行的,那么單純一個本地鎖是無法保證鎖的一致性的,因為兩個微服務中的鎖就不是同一把鎖,這時候就得使用到分布式鎖了 這種情況下,幾乎每一個微服務都會對數據庫進行一次查詢。
分布式鎖思路
三、分布式鎖的特點
分布式鎖一般有如下的特點:
互斥性: 同一時刻只能有一個線程持有鎖 可重入性: 同一節點上的同一個線程如果獲取了鎖之后能夠再次獲取鎖 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒
四、分布式鎖的實現方式
我們一般實現分布式鎖有以下幾種方式:
基于數據庫 基于Redis 基于zookeeper
參考文章
本篇文章主要介紹基于Redis如何實現分布式鎖
五、Redis的分布式鎖實現
利用setnx+expire命令 (錯誤的做法)
Redis的SETNX命令,setnx key value,將key設置為value,當鍵不存在時,才能成功,若鍵存在,什么也不做,成功返回1,失敗返回0 。 SETNX實際上就是SET IF NOT Exists的縮寫因為分布式鎖還需要超時機制,所以我們利用expire命令來設置,所以利用setnx+expire命令的核心代碼如下:
public boolean tryLock ( String key
, String requset
, int timeout
) { Long result
= jedis
. setnx ( key
, requset
) ; if ( result
== 1L ) { return jedis
. expire ( key
, timeout
) == 1L ; } else { return false ; }
}
實際上上面的步驟是有問題的,setnx和expire是分開的兩步操作,不具有原子性,如果執行完第一條指令應用異常或者重啟了,鎖將無法過期。 一種改善方案就是使用Lua腳本來保證原子性(包含setnx和expire兩條指令)
使用Lua腳本(包含setnx和expire兩條指令)
public boolean tryLock_with_lua ( String key
, String UniqueId , int seconds
) { String lua_scripts
= "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" ; List < String > keys
= new ArrayList < > ( ) ; List < String > values
= new ArrayList < > ( ) ; keys
. add ( key
) ; values
. add ( UniqueId ) ; values
. add ( String . valueOf ( seconds
) ) ; Object result
= jedis
. eval ( lua_scripts
, keys
, values
) ; return result
. equals ( 1L ) ;
}
使用set key value [EX seconds][PX milliseconds][NX|XX]命令 (正確做法)
SET key value
[ EX seconds
] [ PX milliseconds
] [ NX
| XX
]
參數說明:
EX seconds: 設定過期時間,單位為秒 PX milliseconds: 設定過期時間,單位為毫秒 NX: 僅當key不存在時設置值 XX: 僅當key存在時設置值 set命令的nx選項,就等同于setnx命令,代碼過程如下:
public boolean tryLock_with_set ( String key
, String UniqueId , int seconds
) { return "OK" . equals ( jedis
. set ( key
, UniqueId , "NX" , "EX" , seconds
) ) ;
}
value必須要具有唯一性,我們可以用UUID來做,設置隨機字符串保證唯一性,至于為什么要保證唯一性?假如value不是隨機字符串,而是一個固定值,那么就可能存在下面的問題:
1.客戶端1獲取鎖成功 2.客戶端1在某個操作上阻塞了太長時間 3.設置的key過期了,鎖自動釋放了 4.客戶端2獲取到了對應同一個資源的鎖 5.客戶端1從阻塞中恢復過來,因為value值一樣,所以執行釋放鎖操作時就會釋放掉客戶端2持有的鎖,這樣就會造成問題
所以通常來說,在釋放鎖時,我們需要對value進行驗證
釋放鎖的實現
釋放鎖時需要驗證value值,也就是說我們在獲取鎖的時候需要設置一個value,不能直接用del key這種粗暴的方式,因為直接del key任何客戶端都可以進行解鎖了,所以解鎖時,我們需要判斷鎖是否是自己的,基于value值來判斷,代碼如下:
public boolean releaseLock_with_lua ( String key
, String value
) { String luaScript
= "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end" ; return jedis
. eval ( luaScript
, Collections . singletonList ( key
) , Collections . singletonList ( value
) ) . equals ( 1L ) ;
}
這里使用Lua腳本的方式,盡量保證原子性。 使用 set key value [EX seconds][PX milliseconds][NX|XX] 命令 看上去很OK,實際上在Redis集群的時候也會出現問題,比如說A客戶端在Redis的master節點上拿到了鎖,但是這個加鎖的key還沒有同步到slave節點,master故障,發生故障轉移,一個slave節點升級為master節點,B客戶端也可以獲取同個key的鎖,但客戶端A也已經拿到鎖了,這就導致多個客戶端都拿到鎖。
總結就是: 單實例redis實現分布式鎖肯定不是很可靠加鎖成功之后,結果 Redis 服務宕機了,就涼了。這時候會提出來將 Redis 主從部署。即使是主從,也是存在巧合的。所以針對Redis集群這種情況,還有其他方案。
Redlock算法 與 Redisson 實現
Redis作者 antirez基于分布式環境下提出了一種更高級的分布式鎖的實現Redlock,原理如下: Redlock參考文章:Redis分布式鎖最牛逼的實現 和 redis.io/topics/dist…
假設有5個獨立的Redis節點(注意這里的節點可以是5個Redis單master實例,也可以是5個Redis Cluster集群,但并不是有5個主節點的cluster集群):
獲取當前Unix時間,以毫秒為單位 依次嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖,當向Redis請求獲取鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應用小于鎖的失效時間,例如你的鎖自動失效時間為10s,則超時時間應該在5~50毫秒之間,這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis實例請求獲取鎖 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間,當且僅當從大多數(N/2+1,這里是3個節點)的Redis節點都取到鎖,并且使用的時間小于鎖失敗時間時,鎖才算獲取成功。 如果取到了鎖,key的真正有效時間等于有效時間減去獲取鎖所使用的時間(步驟3計算的結果) 如果某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)
更多關于redisoon實現分布式的參考文章
六、Redis實現的分布式鎖輪子
下面利用SpringBoot + Jedis + AOP的組合來實現一個簡易的分布式鎖。
自定義注解 自定義一個注解,被注解的方法會執行獲取分布式鎖的邏輯
@Target ( ElementType . METHOD
)
@Retention ( RetentionPolicy . RUNTIME
)
@Documented
@Inherited
public @interface RedisLock { String key ( ) ; int expire ( ) default 5 ; long waitTime ( ) default Long . MIN_VALUE
; TimeUnit timeUnit ( ) default TimeUnit . SECONDS
;
}
AOP攔截器實現 在AOP中我們去執行獲取分布式鎖和釋放分布式鎖的邏輯,代碼如下:
@Aspect
@Component
public class LockMethodAspect { @Autowired private RedisLockHelper redisLockHelper
; @Autowired private JedisUtil jedisUtil
; private Logger logger
= LoggerFactory . getLogger ( LockMethodAspect . class ) ; @Around ( "@annotation(com.redis.lock.annotation.RedisLock)" ) public Object around ( ProceedingJoinPoint joinPoint
) { Jedis jedis
= jedisUtil
. getJedis ( ) ; MethodSignature signature
= ( MethodSignature ) joinPoint
. getSignature ( ) ; Method method
= signature
. getMethod ( ) ; RedisLock redisLock
= method
. getAnnotation ( RedisLock . class ) ; String value
= UUID
. randomUUID ( ) . toString ( ) ; String key
= redisLock
. key ( ) ; try { final boolean islock
= redisLockHelper
. lock ( jedis
, key
, value
, redisLock
. expire ( ) , redisLock
. timeUnit ( ) ) ; logger
. info ( "isLock : {}" , islock
) ; if ( ! islock
) { logger
. error ( "獲取鎖失敗" ) ; throw new RuntimeException ( "獲取鎖失敗" ) ; } try { return joinPoint
. proceed ( ) ; } catch ( Throwable throwable
) { throw new RuntimeException ( "系統異常" ) ; } } finally { logger
. info ( "釋放鎖" ) ; redisLockHelper
. unlock ( jedis
, key
, value
) ; jedis
. close ( ) ; } }
}
Redis實現分布式鎖核心類
@Component
public class RedisLockHelper { private long sleepTime
= 100 ; public boolean lock_setnx ( Jedis jedis
, String key
, String value
, int timeout
) { Long result
= jedis
. setnx ( key
, value
) ; if ( result
== 1L ) { return jedis
. expire ( key
, timeout
) == 1L ; } else { return false ; } } public boolean Lock_with_lua ( Jedis jedis
, String key
, String UniqueId , int seconds
) { String lua_scripts
= "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" ; List < String > keys
= new ArrayList < > ( ) ; List < String > values
= new ArrayList < > ( ) ; keys
. add ( key
) ; values
. add ( UniqueId ) ; values
. add ( String . valueOf ( seconds
) ) ; Object result
= jedis
. eval ( lua_scripts
, keys
, values
) ; return result
. equals ( 1L ) ; } public boolean lock ( Jedis jedis
, String key
, String value
, int timeout
, TimeUnit timeUnit
) { long seconds
= timeUnit
. toSeconds ( timeout
) ; return "OK" . equals ( jedis
. set ( key
, value
, "NX" , "EX" , seconds
) ) ; } public boolean lock_with_waitTime ( Jedis jedis
, String key
, String value
, int timeout
, long waitTime
, TimeUnit timeUnit
) throws InterruptedException { long seconds
= timeUnit
. toSeconds ( timeout
) ; while ( waitTime
>= 0 ) { String result
= jedis
. set ( key
, value
, "nx" , "ex" , seconds
) ; if ( "OK" . equals ( result
) ) { return true ; } waitTime
-= sleepTime
; Thread . sleep ( sleepTime
) ; } return false ; } public void unlock_with_del ( Jedis jedis
, String key
) { jedis
. del ( key
) ; } public boolean unlock ( Jedis jedis
, String key
, String value
) { String luaScript
= "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end" ; return jedis
. eval ( luaScript
, Collections . singletonList ( key
) , Collections . singletonList ( value
) ) . equals ( 1L ) ; }
}
Controller層控制
定義一個
TestController 來測試我們實現的分布式鎖
@RestController
public class TestController { @RedisLock ( key
= "redis_lock" ) @GetMapping ( "/index" ) public String index ( ) { return "index" ; }
}
七、小結
分布式鎖重點在于互斥性,在任意一個時刻,只有一個客戶端獲取了鎖。在實際的生產環境中,分布式鎖的實現可能會更復雜,而我這里的講述主要針對的是單機環境下的基于Redis的分布式鎖實現,至于Redis集群環境并沒有過多涉及,有興趣的朋友可以參考另一篇基于redisoon實現的redis集群環境下的分布式鎖。
參考文章
總結
以上是生活随笔 為你收集整理的基于Redis的分布式锁实现 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。