redis使用sysc超时_基于redis的分布式锁实现
隨著業務越來越復雜,應用服務都會朝著分布式、集群方向部署,而分布式CAP原則告訴我們,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。
很多場景中,需要使用分布式事務、分布式鎖等技術來保證數據最終一致性。有的時候,我們需要保證某一方法同一時刻只能被一個線程執行。
在單機(單進程)環境中,JAVA提供了很多并發相關API,但在多機(多進程)環境中就無能為力了。
對于分布式鎖,最好能夠滿足以下幾點
可以保證在分布式部署的應用集群中,同一個方法在同一時間只能被一臺機器上的一個線程執行
這把鎖要是一把可重入鎖(避免死鎖)
這把鎖最好是一把阻塞鎖
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的性能要好
基于數據庫實現分布式鎖
基于緩存實現分布式鎖
基于zookeeper實現分布式鎖
對于第一種(基于數據庫)及第三種(基于zookeeper)的實現方式可以參考博文http://www.hollischuang.com/a...,本篇文章介紹如何基于redis實現分布式鎖
分布式同步鎖實現
實現思路
鎖的實現主要基于redis的SETNX命令(SETNX詳細解釋參考這里),我們來看SETNX的解釋
SETNX key value
將 key 的值設為 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
返回值:
設置成功,返回 1 。
設置失敗,返回 0 。
使用SETNX完成同步鎖的流程及事項如下:
使用SETNX命令獲取鎖,若返回0(key已存在,鎖已存在)則獲取失敗,反之獲取成功
為了防止獲取鎖后程序出現異常,導致其他線程/進程調用SETNX命令總是返回0而進入死鎖狀態,需要為該key設置一個“合理”的過期時間
釋放鎖,使用DEL命令將鎖數據刪除
實現過程
創建同步鎖實現類
/**
* 同步鎖
*
* @property key Redis key
* @property stringRedisTemplate RedisTemplate
* @property expire Redis TTL/秒
* @property safetyTime 安全時間/秒
*/
class SyncLock(
private val key: String,
private val stringRedisTemplate: StringRedisTemplate,
private val expire: Long,
private val safetyTime: Long
)
key reids中的key,對應java api synchronized的對象
expire reids中key的過期時間
safetyTime 下文介紹其作用
實現鎖的獲取功能
private val value: String get() = Thread.currentThread().name
/**
* 嘗試獲取鎖(立即返回)
*
* @return 是否獲取成功
*/
fun tryLock(): Boolean {
val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false
if (locked) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
}
return locked
}
這里使用setIfAbsent函數(對應SETNX命令)嘗試設置key的值為value(當前線程id+線程名),若成功則同時設置key的過期時間并返回true,否則返回false
實現帶超時時間的鎖獲取功能
private val waitMillisPer: Long = 10
/**
* 嘗試獲取鎖,并至多等待timeout時長
*
* @param timeout 超時時長
* @param unit 時間單位
*
* @return 是否獲取成功
*/
fun tryLock(timeout: Long, unit: TimeUnit): Boolean {
val waitMax = unit.toMillis(timeout)
var waitAlready: Long = 0
while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
Thread.sleep(waitMillisPer)
waitAlready += waitMillisPer
}
if (waitAlready < waitMax) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
return true
}
return false
}
這里使用while循環不斷嘗試鎖的獲取,并至多嘗試timeout時長,在timeout時間內若成功則同時設置key的過期時間并返回true,否則返回false
其實以上兩種tryLock函數還是有一種可能便是,在調用setIfAbsent后、調用expire之前若服務出現異常,也將導致該鎖(key)無法釋放(過期或刪除),使得其他線程/進程再無法獲取鎖而進入死循環,為了避免此問題的產生,我們引入了safetyTime
該參數的作用為,從獲取鎖開始直到safetyTime時長,若仍未獲取成功則認為某一線程/進程出現異常導致數據不正確,此時強制獲取,其實現如下
實現帶保護功能的鎖獲取功能
/**
* 獲取鎖
*/
fun lock() {
val waitMax = TimeUnit.SECONDS.toMillis(safetyTime)
var waitAlready: Long = 0
while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
Thread.sleep(waitMillisPer)
waitAlready += waitMillisPer
}
// stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS)
}
這里同樣使用while循環不斷嘗試鎖的獲取,但至多等待safetyTime時長,最終不論是否成功,均使用SETEX命令將key設置為當前先線程對應的value,并同時設置該key的過期時間
實現鎖的釋放功能
/**
* 釋放鎖
*/
fun unLock() {
stringRedisTemplate.opsForValue()[key]?.let {
if (it == value) {
stringRedisTemplate.delete(key)
}
}
}
鎖的釋放使用DEL命令刪除key,但需要注意的是,釋放鎖時只能釋放本線程持有的鎖
若expire設置不合理,如expire設置為10秒,結果在獲取鎖后線程運行了20秒,該鎖有可能已經被其他線程強制獲取,即該key代表的鎖已經不是當前線程所持有的鎖,此時便不能冒然刪除該key,而只能釋放本線程持有的鎖。
集成spring boot
為了更好的與spring集成,我們創建一個工廠類來輔助創建同步鎖實例
/**
* SyncLock同步鎖工廠類
*/
@Component
class SyncLockFactory {
@Autowired
private lateinit var stringRedisTemplate: StringRedisTemplate
private val syncLockMap = mutableMapOf()
/**
* 創建SyncLock
*
* @param key Redis key
* @param expire Redis TTL/秒,默認10秒
* @param safetyTime 安全時間/秒,為了防止程序異常導致死鎖,在此時間后強制拿鎖,默認 expire * 5 秒
*/
@Synchronized
fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock {
if (!syncLockMap.containsKey(key)) {
syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime)
}
return syncLockMap[key]!!
}
}
在spring框架下可以更方便的使用
@Component
class SomeLogic: InitializingBean {
@Autowired
lateinit var syncLockFactory: SyncLockFactory
lateinit var syncLock
override fun afterPropertiesSet() {
syncLock = syncLockFactory.build("lock:some:name", 10)
}
fun someFun() {
syncLock.lock()
try {
// some logic
} finally {
syncLock.unlock()
}
}
}
注解的實現
借助spring aop框架,我們可以將SyncLock的使用進一步簡化
創建注解類
/**
* 同步鎖注解
*
* @property key Redis key
* @property expire Redis TTL/秒,默認10秒
*/
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class SyncLockable(
val key: String,
val expire: Long = 10
)
實現AOP
/**
* 同步鎖注解處理
*/
@Aspect
@Component
class SyncLockHandle {
@Autowired
private lateinit var syncLockFactory: SyncLockFactory
/**
* 在方法上執行同步鎖
*/
@Around("@annotation(syncLockable)")
fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? {
val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire)
try {
lock.lock()
return jp.proceed()
} finally {
lock.unLock()
}
}
}
如此一來,我們便可以按照如下方式使用SyncLock
@Component
class SomeLogic {
@SyncLockable("lock:some:name", 10)
fun someFun() {
// some logic
}
}
是不是顯得更加方便!
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的redis使用sysc超时_基于redis的分布式锁实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mfc 窗体不可点击的原因_如何设计一个
- 下一篇: 赣州市热水器怎么选购?有哪些技巧?