关抢占 自旋锁_关于Redis分布式锁这一篇应该是讲的最好的了,先收藏起来再看!...
作者:Nan,氣沖天.
原文:https://blog.csdn.net/qq_44209336
前言
在Java并發編程中,我們通常使用到synchronized?、Lock這兩個線程鎖,Java中的鎖,只能保證對同一個JVM中的線程有效。而在分布式集群環境,這個時候我們就需要使用到分布式鎖。
實現分布式鎖的方案
基于數據庫實現分布式鎖
基于緩存Redis實現分布式鎖
基于Zookeeper的臨時序列化節點實現分布式鎖
Redis實現分布式鎖
場景:在高并發的情況下,可能有大量請求來到數據庫查詢三級分類數據,而這種數據不會經常改變,可以引入緩存來存儲第一次從數據庫查詢出來的數據,其他線程就可以去緩存中獲取數據,來減少數據庫的查詢壓力。
在集群的環境下,就可以使用分布式鎖來控制去查詢數據庫的次數。
階段一
private Map<String, List> getCatalogJsonDBWithRedisLock() {// 去Redis中搶占位置Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");if (lock){// 搶到鎖了 執行業務Map<String, List> dataFromDb = getDataFromDb();// 刪除鎖
stringRedisTemplate.delete("lock");return dataFromDb;
}else {// 自旋獲取鎖// 休眠100mstry {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}return getCatalogJsonDBWithRedisLock();
}
}
得到鎖以后,我們應該再去緩存中確定一次,如果沒有才需要繼續查詢,從數據庫查到數據以后,應該先把數據放入緩存中,再將數據返回。
private Map<String, List> getDataFromDb() {// 得到鎖以后,我們應該再去緩存中確定一次,如果沒有才需要繼續查詢String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson");if (!StringUtils.isEmpty(catalogJson)) {// 反序列化 轉換為指定對象Map<String, List> result = JSON.parseObject(catalogJson, newTypeReference<Map<String, List>>() {});return result;
}
System.out.println("查詢數據庫了......");// 查詢所有分類數據在進行刷選List categoryEntityList = baseMapper.selectList(null);// 查詢一級分類List leave1Categorys = getParent_cid(categoryEntityList, 0L);Map<String, List> listMap = leave1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), l1 -> {List categoryL2List = getParent_cid(categoryEntityList, l1.getCatId());List catelog2Vos = null;if (categoryL2List != null) {
catelog2Vos = categoryL2List.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(l2.getParentCid().toString(), null, l2.getCatId().toString(), l2.getName());List categoryL3List = getParent_cid(categoryEntityList,
l2.getCatId());if (categoryL3List != null) {List catelog3Vos =
categoryL3List.stream().map(l3 -> {
Catelog2Vo.Catelog3Vo catelog3Vo = new
Catelog2Vo.Catelog3Vo(l2.getCatId().toString(),
l3.getCatId().toString(),
l3.getName());return catelog3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(catelog3Vos);
}return catelog2Vo;
}).collect(Collectors.toList());
}return catelog2Vos;
}));// 最后需將數據加入的緩存中String jsonString = JSON.toJSONString(listMap);
stringRedisTemplate.opsForValue().set("catalogJson", jsonString, 1L,
TimeUnit.DAYS);return listMap;
}
階段二
private Map<String, List> getCatalogJsonDBWithRedisLock() {// 去Redis中搶占位置Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");if (lock){// 搶到鎖了 執行業務// 設置過期時間
stringRedisTemplate.expire("lock",3,TimeUnit.SECONDS);Map<String, List> dataFromDb = getDataFromDb();// 刪除鎖
stringRedisTemplate.delete("lock");return dataFromDb;
}else {// 自旋獲取鎖// 休眠100mstry {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}return getCatalogJsonDBWithRedisLock();
}
}
階段三
private Map<String, List> getCatalogJsonDBWithRedisLock() {// 去Redis中搶占位置 保證原子性Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",300,TimeUnit.SECONDS);if (lock){// 搶到鎖了 執行業務Map<String, List> dataFromDb = getDataFromDb();// 刪除鎖
stringRedisTemplate.delete("lock");return dataFromDb;
}else {// 自旋獲取鎖// 休眠100mstry {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}return getCatalogJsonDBWithRedisLock();
}
}
階段四
private Map<String, List> getCatalogJsonDBWithRedisLock() {String uuid = UUID.randomUUID().toString();// 去Redis中搶占位置 保證原子性Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);if (lock){// 搶到鎖了 執行業務Map<String, List> dataFromDb = getDataFromDb();String s = stringRedisTemplate.opsForValue().get("lock");if (uuid.equals(s)){// 刪除鎖
stringRedisTemplate.delete("lock");
}return dataFromDb;
}else {// 自旋獲取鎖// 休眠100mstry {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}return getCatalogJsonDBWithRedisLock();
}
}
階段五
private Map<String, List> getCatalogJsonDBWithRedisLock() {String uuid = UUID.randomUUID().toString();// 去Redis中搶占位置 保證原子性Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);if (lock){// 搶到鎖了 執行業務Map<String, List> dataFromDb = getDataFromDb();String s = stringRedisTemplate.opsForValue().get("lock");// 獲取值對比+對比成功刪除=原子操作 Lua腳本解鎖String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +" return redis.call(\"del\",KEYS[1])\n" +"else\n" +" return 0\n" +"end";
Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript(script, Long.class) , Arrays.asList("lock"), uuid);return dataFromDb;
}else {// 自旋獲取鎖// 休眠100mstry {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}return getCatalogJsonDBWithRedisLock();
}
}
小總結
stringRedisTemplate.opsForValue().setIfAbsent(“lock”, uuid,300,TimeUnit.SECONDS);
stringRedisTemplate.execute(new DefaultRedisScript(script, Long.class) , Arrays.asList(“lock”), uuid);
使用Redis來實現分布式鎖需保證加鎖【占位+過期時間】和刪除鎖【判斷+刪除】操作的原子性。
Redis鎖的過期時間小于業務的執行時間該如何自動續期?
設置一個比業務耗時更長的過期時間
Redisson的看門狗機制
Redisson實現分布式鎖
Redisson 簡介
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet,?Set,?Multimap,?SortedSet,?Map,?List,?Queue,?BlockingQueue,?Deque,?BlockingDeque,?Semaphore,?Lock,?AtomicLong,?CountDownLatch,?Publish / Subscribe,?Bloom filter,?Remote service,?Spring cache,?Executor service,?Live Object service,?Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
原理機制
集成Spring Boot 項目
引入依賴 【可引入Spring Boot 封裝好的starter】
org.redisson
redisson
3.12.0
添加配置類
@Configurationpublic class MyRedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient(){
// 創建配置 記得加redis://
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.26.104:6379");
// 根據配置創建RedissClient客戶端
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
可重入鎖 Reentrant Lock
獲取一把鎖 redissonClient.getLock(“my-lock”);
給業務代碼加鎖 lock.lock();
解鎖 lock.unlock();
看門狗機制 鎖會自動續期
@GetMapping("/hello")
public String hello(){
// 1、獲取一把鎖,只要鎖的名字一樣,就是同一把鎖
RLock lock = redissonClient.getLock("my-lock");
// 加鎖
// 阻塞式等待,默認加的鎖都是【看門狗時間】30s時間
//1)、鎖的自動續期,如果業務超長,運行期間自動給鎖續上新的30s,不用擔心業務時間長,鎖自動過期被刪掉
//2)、加鎖的業務只要運行完成,就不會給當前鎖續期,即使不手動解鎖,鎖默認在30s以后自動刪除
lock.lock();
try {
System.out.println("加鎖成功......."+Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
}finally {
// 釋放鎖 不會出現死鎖狀態 如果沒有執行解鎖,鎖有過期時間,過期了會將鎖刪除
lock.unlock();
System.out.println("解鎖成功......"+Thread.currentThread().getId());
}
return "hello";
}
lock方法有一個重載方法 lock(long leaseTime, TimeUnit unit)
public void lock(long leaseTime, TimeUnit unit) {try {
this.lock(leaseTime, unit, false);
} catch (InterruptedException var5) {
throw new IllegalStateException();
}
}
注意:指定了過期時間后,不會進行自動續期,此時如果有多個線程,即便業務還然在執行,過期時間到了之后,鎖就會被釋放,其他線程就會爭搶到鎖。
二個方法對比
如果設置了過期時間,就會發生執行腳本給Redis,進行占鎖,設置過期時間為我們指定的時間。
未設置過期時間,就會使用看門狗的默認時間LockWatchdogTimeout 30*1000
只有沒有指定過期的時間的方法才有自動續期功能
自動續期實現機制 :只要占鎖成功,就會自動啟動一個定時任務【重新給鎖設置過期時間,新的過期時間就是看門狗的默認時間】,每隔10s【( internalLockLeasTime)/3】都會自動續期。
持有鎖的機器宕機問題,因為來不及續期,所以鎖自動被釋放,當該機再次恢復時,因為其后臺守護線程是ScheduleTask,所以恢復后會馬上執行一次watchDog續期邏輯,執行過程中,它會感知到自己已經丟失了鎖,所以不存在共同持有的問題。
讀寫鎖 ReadWriteLock
保證一定能讀到最新數據,修改期間,寫鎖是一個互斥鎖,讀鎖是一個共享鎖。
寫+讀 寫鎖沒有釋放,讀鎖就得等待
寫+寫 阻塞方式
讀+寫 寫鎖等待讀鎖釋放才能加鎖
讀+讀 相當于無鎖,并發讀
@GetMapping("/write")
public String writeLock(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.writeLock();
String s = "";
try {
rLock.lock();
System.out.println("寫鎖加鎖成功......"+Thread.currentThread().getId());
s = UUID.randomUUID().toString();
stringRedisTemplate.opsForValue().set("writeLock",s);
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rLock.unlock();
System.out.println("寫鎖釋放成功......"+Thread.currentThread().getId());
}
return s;
}
@ResponseBody
@GetMapping("/read")
public String readLock(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.readLock();
rLock.lock();
String s = "";
try{
System.out.println("讀鎖加鎖成功......"+Thread.currentThread().getId());
s = stringRedisTemplate.opsForValue().get("writeLock");
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("讀鎖釋放成功......"+Thread.currentThread().getId());
rLock.unlock();
}
return s;
}
信號量 Semaphore
使用信號量來做分布式限流
@ResponseBody@GetMapping("/park")
public String park() throws InterruptedException {
RSemaphore park = redissonClient.getSemaphore("park");
// 搶占一個車位
boolean b = park.tryAcquire();
// 如果還可以搶占到 就執行業務代碼
if (b){
// 執行業務代碼
}else {
return "error";
}
return "ok=>"+b;
}
@ResponseBody
@GetMapping("/go")
public String go() {
RSemaphore park = redissonClient.getSemaphore("park");
// 釋放一個車位
park.release();
return "ok";
}
閉鎖 CountDownLatch
模擬場景:等待班級放學走了,保安關校門。
@ResponseBody@GetMapping("/lockdoor")
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.trySetCount(5);
// 等待閉鎖完成
door.await();
return "放假了.....";
}
@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id")Long id){
RCountDownLatch door = redissonClient.getCountDownLatch("door");
// 計數減一
door.countDown();
return id+"班級走了....";
}
Redisson解決上面Redis查詢問題
/*** 使用Redisson分布式鎖來實現多個服務共享同一緩存中的數據
* @return
*/
private Map<String, List> getCatalogJsonDBWithRedissonLock() {
RLock lock = redissonClient.getLock("catalogJson-lock");// 該方法會阻塞其他線程向下執行,只有釋放鎖之后才會接著向下執行
lock.lock();Map<String, List> dataFromDb = null;try {
dataFromDb = getDataFromDb();
}finally {
lock.unlock();
}return dataFromDb;
}
總結
以上是生活随笔為你收集整理的关抢占 自旋锁_关于Redis分布式锁这一篇应该是讲的最好的了,先收藏起来再看!...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 蔚来汽车3月交付10378台 目标今年销
- 下一篇: 意大利宣布禁用 ChatGPT 聊天机器