zookeeper做分布式锁
生活随笔
收集整理的這篇文章主要介紹了
zookeeper做分布式锁
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原理
多個客戶端搶一把鎖,這些客戶端在 zookeeper 里創建各自的臨時序列node,無論什么順序,阻塞住,直到找到第一個創建的節點才放開,即第一個客戶端獲得了鎖,不是第一個的客戶端,就阻塞,直到第一個客戶端運行結束,刪除他創建的節點,讓剩下的客戶端繼續輪循直到找到第一個的過程。
結果展示
用10個線程模擬10個客戶端的結果如下:
代碼
package org.faithgreen.lock1;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class LockWatchCallBack1 implements Watcher, AsyncCallback.StatCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StringCallback {private ZooKeeper zk;private String threadName;private String pathName;private CountDownLatch latch = new CountDownLatch(1);public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}/*** 創建 CreateMode.EPHEMERAL_SEQUENTIAL 節點,阻塞*/public void tryLock() {zk.create("/lock", threadName.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** zk.create 的 StringCallBack impl* 如果創建完了,這個回調會被調用,獲取children** @param rc* @param path* @param o* @param childPath*/@Overridepublic void processResult(int rc, String path, Object o, String childPath) {if (childPath != null) {System.out.println(childPath + " created");pathName = childPath;zk.getChildren("/", this, this, "dev");}}/*** 獲取 children 的回調* 多個客戶端創建節點,list 總是有第一個的,如果是第一個,線程放行,否則在前一個節點上添加 watcher** @param rc* @param s* @param o* @param list*/@Overridepublic void processResult(int rc, String s, Object o, List<String> list) { // Collections.sort(list);int i = list.indexOf(pathName.substring(1));if (i == 0) {// 是第一個,就對了,獲取鎖了System.out.println(threadName + " is first ....");latch.countDown();} else {// 不是第一個,就在他的前一個加 watcherzk.exists("/" + list.get(i - 1), this, this, "dev");}}/*** children 節點的 watcher* 在釋放鎖是刪除節點的時候觸發再一次獲取 children 的過程** @param e*/@Overridepublic void process(WatchedEvent e) {switch (e.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zk.getChildren("/", false, this, "dadd");break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}/*** 釋放鎖的時候,刪除節點*/public void unLock() {try {System.out.println(threadName + "工作結束,釋放鎖 ....");zk.delete(pathName, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}} } package org.faithgreen.lock1;import org.apache.zookeeper.ZooKeeper; import org.faithgreen.conf1.ZooKeeperUtils; import org.junit.After; import org.junit.Before; import org.junit.Test;import java.io.IOException;public class TestLock1 {ZooKeeper zk;@Beforepublic void getZk() {zk = ZooKeeperUtils.getZk();}@Afterpublic void after() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void lock() {for (int i = 0; i < 10; i++) {new Thread(() -> {LockWatchCallBack1 w = new LockWatchCallBack1();String name = Thread.currentThread().getName();w.setThreadName(name);w.setZk(zk);// 獲取鎖w.tryLock();// 模擬客戶端得到鎖之后的工作System.out.println(name + " working ...");// 釋放鎖w.unLock();}).start();}try {System.in.read();} catch (IOException e) {e.printStackTrace();}} } package org.faithgreen.conf1;import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;public class ZooKeeperUtils {static ZooKeeper zk;final static String address = "192.168.172.3:2181,192.168.172.4:2181,192.168.172.5:2181,192.168.172.6:2181/testLock";static CountDownLatch latch = new CountDownLatch(1);static DefaultWatcher defaultWatcher = new DefaultWatcher();public static ZooKeeper getZk() {ZooKeeper zk = null;try {zk = new ZooKeeper(address, 3000, defaultWatcher);defaultWatcher.setLatch(latch);latch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return zk;}}總結
以上是生活随笔為你收集整理的zookeeper做分布式锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zookeeper配置中心
- 下一篇: 记一个mysql分页查询优化试验