基于 Redis 实现的分布式锁
??點擊上方?好好學(xué)java?,選擇?星標(biāo)?公眾號
重磅資訊、干貨,第一時間送達(dá) 今日推薦:我的大學(xué)到研究生自學(xué) Java 之路,過程艱辛,不放棄,保持熱情,最終發(fā)現(xiàn)我是這樣拿到大廠 offer 的!作者:你的笑像一條狗 鏈接:https://segmentfault.com/a/1190000022533998基于Redis實現(xiàn)的分布式鎖
Spring Cloud 分布式環(huán)境下,同一個服務(wù)都是部署在不同的機器上,這種情況無法像單體架構(gòu)下數(shù)據(jù)一致性問題采用加鎖就實現(xiàn)數(shù)據(jù)一致性問題,在高并發(fā)情況下,對于分布式架構(gòu)顯然是不合適的,針對這種情況我們就需要用到分布式鎖了。
哪些場景需要用分布式鎖
場景一:比較敏感的數(shù)據(jù)比如金額修改,同一時間只能有一個人操作,想象下2個人同時修改金額,一個加金額一個減金額,為了防止同時操作造成數(shù)據(jù)不一致,需要鎖,如果是數(shù)據(jù)庫需要的就是行鎖或表鎖,如果是在集群里,多個客戶端同時修改一個共享的數(shù)據(jù)就需要分布式鎖。
場景二:比如多臺機器都可以定時執(zhí)行某個任務(wù),如果限制任務(wù)每次只能被一臺機器執(zhí)行,不能重復(fù)執(zhí)行,就可以用分布式鎖來做標(biāo)記。
場景三:比如秒殺場景,要求并發(fā)量很高,那么同一件商品只能被一個用戶搶到,那么就可以使用分布式鎖實現(xiàn)。
分布式鎖實現(xiàn)方式:
1、基于數(shù)據(jù)庫實現(xiàn)分布式鎖
2、基于緩存(redis,memcached,tair)實現(xiàn)分布式鎖
3、基于Zookeeper實現(xiàn)分布式鎖
為什么不使用數(shù)據(jù)庫?
數(shù)據(jù)庫是單點?搞兩個數(shù)據(jù)庫,數(shù)據(jù)之前雙向同步。一旦掛掉快速切換到備庫上。
沒有失效時間?只要做一個定時任務(wù),每隔一定時間把數(shù)據(jù)庫中的超時數(shù)據(jù)清理一遍。
非阻塞的?搞一個while循環(huán),直到insert成功再返回成功。
非重入的?在數(shù)據(jù)庫表中加個字段,記錄當(dāng)前獲得鎖的機器的主機信息和線程信息,那么下次再獲取鎖的時候先查詢數(shù)據(jù)庫,如果當(dāng)前機器的主機信息和線程信息在數(shù)據(jù)庫可以查到的話,直接把鎖分配給他就可以了。
大量請求下數(shù)據(jù)庫往往是系統(tǒng)的瓶頸,大量連接,然后sql查詢,幾乎所有時間都浪費到這些上面,所以往往情況下能內(nèi)存操作就在內(nèi)存操作,使用基于內(nèi)存操作的Redis實現(xiàn)分布式鎖,也可以根據(jù)需求選擇ZooKeeper 來實現(xiàn)。
通過 Redis 的 Redlock 和 ZooKeeper 來加鎖,性能有了比較大的提升,一般情況我們根據(jù)實際場景選擇使用。
分布式鎖應(yīng)該滿足要求
互斥性 可以保證在分布式部署的應(yīng)用集群中,同一個方法在同一時間只能被一臺機器上的一個線程執(zhí)行。
這把鎖要是一把可重入鎖(避免死鎖)
不會發(fā)生死鎖:有一個客戶端在持有鎖的過程中崩潰而沒有解鎖,也能保證其他客戶端能夠加鎖
這把鎖最好是一把阻塞鎖(根據(jù)業(yè)務(wù)需求考慮要不要這條)
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的性能要好
Redis實現(xiàn)分布式鎖
Redis實現(xiàn)分布式鎖利用 SETNX 和 SETEX
基本命令主要有:
SETNX(SET If Not Exists):當(dāng)且僅當(dāng) Key 不存在時,則可以設(shè)置,否則不做任何動作。
當(dāng)且僅當(dāng) key 不存在,將 key 的值設(shè)為 value ,并返回1;若給定的 key 已經(jīng)存在,則 SETNX 不做任何動作,并返回0。
SETEX:基于SETNX功能外,還可以設(shè)置超時時間,防止死鎖。
分布式鎖
分布式鎖其實大白話,本質(zhì)上要實現(xiàn)的目標(biāo)(客戶端)在redis中占一個位置,等到這個客戶試用,別的人進(jìn)來就必須得等著,等我試用完了,走了,你再來。 感覺跟多線程鎖一樣,意思大致是一樣的,多線程是針對單機的,在同一個Jvm中,但是分布式石鎖,是跨機器的,多個進(jìn)程不同機器上發(fā)來得請求,去對同一個數(shù)據(jù)進(jìn)行操作。
比如,分布式架構(gòu)下的秒殺系統(tǒng),幾萬人對10個商品進(jìn)行搶購,10個商品存在redis中,就是表示10個位置,第一個人進(jìn)來了,商品就剩9個了,第二個人進(jìn)來就剩8個,在第一個人進(jìn)來的時候,其他人必須等到10個商品數(shù)量成功減去1之后你才能進(jìn)來。
這個過程中第一個人進(jìn)來的時候還沒操作減1然后異常了,沒有釋放鎖,然后后面人一直等待著,這就是死鎖。真對這種情況可以設(shè)置超時時間,如果超過10s中還是沒出來,就讓他超時失效。
redis中提供了 setnx(set if not exists) 指令
> setnx lock:codehole true -- 鎖定 OK ... do something xxxx... 數(shù)量減1 > del lock:codehole -- 釋放鎖 (integer) 1 --成功如果在減1期間發(fā)生異常 del 指令沒有被調(diào)用 然后就一直等著,鎖永遠(yuǎn)不會釋放。
redis Redis 2.8 版本中提供了 setex(set if not exists) 指令 setnx 和 expire 兩個指令構(gòu)成一個原子操作 給鎖加上一個過期時間
> setex lock:codehole true OK > expire lock:codehole 5 ... do something xxxx ... > del lock:codehole (integer) 1SETEX 實現(xiàn)原理
通過 SETNX 設(shè)置 Key-Value 來獲得鎖,隨即進(jìn)入死循環(huán),每次循環(huán)判斷,如果存在 Key 則繼續(xù)循環(huán),如果不存在 Key,則跳出循環(huán),當(dāng)前任務(wù)執(zhí)行完成后,刪除 Key 以釋放鎖。
實現(xiàn)步驟
pom.xml 導(dǎo)入Redis依賴
<!-- redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version><scope>provided</scope></dependency>添加配置文件 application.yml:
server:port: 8080spring:profiles: devdata:redis:# Redis數(shù)據(jù)庫索引(默認(rèn)為0)database: 0# Redis服務(wù)器地址host: 127.0.0.1# Redis服務(wù)器連接端口port: 6379# Redis服務(wù)器連接密碼(默認(rèn)為空)password:全局鎖類
@Data public?class?Lock?{/***?key名*/private?String?name;/***?value值*/private?String?value;public?Lock(String?name,?String?value)?{this.name?=?name;this.value?=?value;}}分布式鎖類
@Slf4j @Component public?class?DistributedLockConfig?{/***?單個業(yè)務(wù)持有鎖的時間30s,防止死鎖*/private?final?static?long?LOCK_EXPIRE?=?30?*?1000L;/***?默認(rèn)30ms嘗試一次*/private?final?static?long?LOCK_TRY_INTERVAL?=?30L;/***?默認(rèn)嘗試20s*/private?final?static?long?LOCK_TRY_TIMEOUT?=?20?*?1000L;private?RedisTemplate?template;public?void?setTemplate(RedisTemplate?template)?{this.template?=?template;}/***?嘗試獲取全局鎖**?@param?lock?鎖的名稱*?@return?true?獲取成功,false獲取失敗*/public?boolean?tryLock(Lock?lock)?{return?getLock(lock,?LOCK_TRY_TIMEOUT,?LOCK_TRY_INTERVAL,?LOCK_EXPIRE);}/***?嘗試獲取全局鎖*?SETEX:可以設(shè)置超時時間**?@param?lock????鎖的名稱*?@param?timeout?獲取超時時間?單位ms*?@return?true?獲取成功,false獲取失敗*/public?boolean?tryLock(Lock?lock,?long?timeout)?{return?getLock(lock,?timeout,?LOCK_TRY_INTERVAL,?LOCK_EXPIRE);}/***?嘗試獲取全局鎖**?@param?lock????????鎖的名稱*?@param?timeout?????獲取鎖的超時時間*?@param?tryInterval?多少毫秒嘗試獲取一次*?@return?true?獲取成功,false獲取失敗*/public?boolean?tryLock(Lock?lock,?long?timeout,?long?tryInterval)?{return?getLock(lock,?timeout,?tryInterval,?LOCK_EXPIRE);}/***?嘗試獲取全局鎖**?@param?lock???????????鎖的名稱*?@param?timeout????????獲取鎖的超時時間*?@param?tryInterval????多少毫秒嘗試獲取一次*?@param?lockExpireTime?鎖的過期*?@return?true?獲取成功,false獲取失敗*/public?boolean?tryLock(Lock?lock,?long?timeout,?long?tryInterval,?long?lockExpireTime)?{return?getLock(lock,?timeout,?tryInterval,?lockExpireTime);}/***?操作redis獲取全局鎖**?@param?lock???????????鎖的名稱*?@param?timeout????????獲取的超時時間*?@param?tryInterval????多少ms嘗試一次*?@param?lockExpireTime?獲取成功后鎖的過期時間*?@return?true?獲取成功,false獲取失敗*/public?boolean?getLock(Lock?lock,?long?timeout,?long?tryInterval,?long?lockExpireTime)?{try?{if?(StringUtils.isEmpty(lock.getName())?||?StringUtils.isEmpty(lock.getValue()))?{return?false;}long?startTime?=?System.currentTimeMillis();do?{if?(!template.hasKey(lock.getName()))?{ValueOperations<String,?String>?ops?=?template.opsForValue();ops.set(lock.getName(),?lock.getValue(),?lockExpireTime,?TimeUnit.MILLISECONDS);return?true;}?else?{//存在鎖log.debug("lock?is?exist!!!");}//嘗試超過了設(shè)定值之后直接跳出循環(huán)if?(System.currentTimeMillis()?-?startTime?>?timeout)?{return?false;}//每隔多長時間嘗試獲取Thread.sleep(tryInterval);}while?(template.hasKey(lock.getName()));}?catch?(InterruptedException?e)?{log.error(e.getMessage());return?false;}return?false;}/***?獲取鎖*?SETNX(SET?If?Not?Exists):當(dāng)且僅當(dāng)?Key?不存在時,則可以設(shè)置,否則不做任何動作。*/public?Boolean?getLockNoTime(Lock?lock)?{if?(!StringUtils.isEmpty(lock.getName()))?{return?false;}//?setIfAbsent?底層封裝命令?是?setNX()boolean?falg?=?template.opsForValue().setIfAbsent(lock.getName(),?lock.getValue());return?false;}/***?釋放鎖*/public?void?releaseLock(Lock?lock)?{if?(!StringUtils.isEmpty(lock.getName()))?{template.delete(lock.getName());}}}測試方法
@RequestMapping("test")public?String?index()?{distributedLockConfig.setTemplate(redisTemplate);Lock?lock?=?new?Lock("test",?"test");if?(distributedLockConfig.tryLock(lock))?{try?{//為了演示鎖的效果,這里睡眠5000毫秒System.out.println("執(zhí)行方法");Thread.sleep(5000);}?catch?(Exception?e)?{e.printStackTrace();}distributedLockConfig.releaseLock(lock);}return?"hello?world!";}開啟兩個瀏覽器窗口,執(zhí)行方法,我們可以看到兩個瀏覽器在等待執(zhí)行,當(dāng)一個返回 hello world! 之后,如果沒超時執(zhí)行另一個也會返回hello world! 兩個方法彼此先后返回,說明分布式鎖執(zhí)行成功。
但是存在一個問題:
這段方法是先去查詢key是否存在redis中,如果存在走循環(huán),然后根據(jù)間隔時間去等待嘗試獲取,如果不存在則進(jìn)行獲取鎖,如果等待時間超過超時時間返回false。
1 這種方式性能問題很差,每次獲取鎖都要進(jìn)行等待,很是浪費資源,
2 如果在判斷鎖是否存在這兒2個或者2個以上的線程都查到redis中存在key,同一時刻就無法保證一個客戶端持有鎖,不具有排他性。
如果在集群環(huán)境下也會存在問題
假如在哨兵模式中 主節(jié)點獲取到鎖之后,數(shù)據(jù)沒有同步到從節(jié)點主節(jié)點掛掉了,這樣數(shù)據(jù)完整性不能保證,另一個客戶端請求過來,就會一把鎖被兩個客戶端持有,會導(dǎo)致數(shù)據(jù)一致性出問題。
在這里插入圖片描述對此Redis中還提供了另外一種實現(xiàn)分布式鎖的方法 Redlock
利用 Redlock
Redlock是redis官方提出的實現(xiàn)分布式鎖管理器的算法。這個算法會比一般的普通方法更加安全可靠。
為什么選擇紅鎖? 在集群中需要半數(shù)以上的節(jié)點同意才能獲得鎖,保證了數(shù)據(jù)的完整性,不會因為主節(jié)點數(shù)據(jù)存在,主節(jié)點掛了之后沒有同步到從節(jié)點,導(dǎo)致數(shù)據(jù)丟失。
Redlock 算法
使用場景
對于Redis集群模式盡量采用這種分布式鎖,保證高可用,數(shù)據(jù)一致性,就使用Redlock 分布式鎖。
pom.xml 增加依賴
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.7.0</version> </dependency>獲取鎖后需要處理的邏輯
/***?獲取鎖后需要處理的邏輯*/ public?interface?AquiredLockWorker<T>?{T?invokeAfterLockAquire()?throws?Exception; }獲取鎖管理類
/***?獲取鎖管理類*/ public?interface?DistributedLocker?{/***?獲取鎖*?@param?resourceName??鎖的名稱*?@param?worker?獲取鎖后的處理類*?@param?<T>*?@return?處理完具體的業(yè)務(wù)邏輯要返回的數(shù)據(jù)*?@throws?UnableToAquireLockException*?@throws?Exception*/<T>?T?lock(String?resourceName,?AquiredLockWorker<T>?worker)?throws?UnableToAquireLockException,?Exception;<T>?T?lock(String?resourceName,?AquiredLockWorker<T>?worker,?int?lockTime)?throws?UnableToAquireLockException,?Exception;}異常類
/***?異常類*/ public?class?UnableToAquireLockException?extends?RuntimeException?{public?UnableToAquireLockException()?{}public?UnableToAquireLockException(String?message)?{super(message);}public?UnableToAquireLockException(String?message,?Throwable?cause)?{super(message,?cause);} }獲取RedissonClient連接類
/***?獲取RedissonClient連接類*/ @Component public?class?RedissonConnector?{RedissonClient?redisson;@PostConstructpublic?void?init(){redisson?=?Redisson.create();}public?RedissonClient?getClient(){return?redisson;}}分布式鎖實現(xiàn)
@Component public?class?RedisLocker??implements?DistributedLocker{private?final?static?String?LOCKER_PREFIX?=?"lock:";@AutowiredRedissonConnector?redissonConnector;@Overridepublic?<T>?T?lock(String?resourceName,?AquiredLockWorker<T>?worker)?throws?InterruptedException,?UnableToAquireLockException,?Exception?{return?lock(resourceName,?worker,?100);}@Overridepublic?<T>?T?lock(String?resourceName,?AquiredLockWorker<T>?worker,?int?lockTime)?throws?UnableToAquireLockException,?Exception?{RedissonClient?redisson=?redissonConnector.getClient();RLock?lock?=?redisson.getLock(LOCKER_PREFIX?+?resourceName);//?Wait?for?100?seconds?seconds?and?automatically?unlock?it?after?lockTime?secondsboolean?success?=?lock.tryLock(100,?lockTime,?TimeUnit.SECONDS);if?(success)?{try?{return?worker.invokeAfterLockAquire();}?finally?{lock.unlock();}}throw?new?UnableToAquireLockException();} }測試方法
ScheduledExecutorService?scheduledExecutorService?=?Executors.newScheduledThreadPool(10);for?(int?i?=?0;?i?<?50;?i++)?{scheduledExecutorService.execute(new?Worker());}scheduledExecutorService.shutdown();//任務(wù)class?Worker?implements?Runnable?{public?Worker()?{}@Overridepublic?void?run()?{try?{redisLocker.lock("tizz1100",?new?AquiredLockWorker<Object>()?{@Overridepublic?Object?invokeAfterLockAquire()?{doTask();return?null;}});}?catch?(Exception?e)?{}}void?doTask()?{System.out.println(Thread.currentThread().getName()?+?"?----------?"?+?LocalDateTime.now());System.out.println(Thread.currentThread().getName()?+?"?start");Random?random?=?new?Random();int?_int?=?random.nextInt(200);System.out.println(Thread.currentThread().getName()?+?"?sleep?"?+?_int?+?"millis");try?{Thread.sleep(_int);}?catch?(InterruptedException?e)?{e.printStackTrace();}System.out.println(Thread.currentThread().getName()?+?"?end");}}參考資料:
https://blog.csdn.net/yue_201...
https://blog.csdn.net/weixin_...
https://github.com/pomestyle/SpringBoot/tree/master/Springboot-Redis-SETEX
最后,再附上我歷時三個月總結(jié)的?Java 面試 + Java 后端技術(shù)學(xué)習(xí)指南,這是本人這幾年及春招的總結(jié),目前,已經(jīng)拿到了騰訊等大廠offer,拿去不謝,github 地址:https://github.com/OUYANGSIHAI/JavaInterview
這么辛苦總結(jié),給個star好不好。?點擊閱讀原文,直達(dá)
總結(jié)
以上是生活随笔為你收集整理的基于 Redis 实现的分布式锁的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Springboot 整合微信小程序实现
- 下一篇: 为 hexo 博客添加本地搜索功能