api 创建zookeeper客户端_zookeeper分布式锁原理及实现
生活随笔
收集整理的這篇文章主要介紹了
api 创建zookeeper客户端_zookeeper分布式锁原理及实现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
前言
本文介紹下 zookeeper方式 實現分布式鎖
原理簡介
zookeeper實現分布式鎖的原理就是多個節點同時在一個指定的節點下面創建臨時會話順序節點,誰創建的節點序號最小,誰就獲得了鎖,并且其他節點就會監聽序號比自己小的節點,一旦序號比自己小的節點被刪除了,其他節點就會得到相應的事件,然后查看自己是否為序號最小的節點,如果是,則獲取鎖
docker 安裝 zk
下載鏡像
docker pull zookeeper
啟動鏡像
docker run --name zk -p 2181:2181 -p 2888:2888 -p 3888:3888 --restart always -d zookeeper-p 端口映射
--name 容器實例名稱
-d 后臺運行
2181 Zookeeper客戶端交互端口
2888 Zookeeper集群端口
3888 Zookeeper選舉端口
查看容器
docker ps |grep zookeeper
zk簡單的幾個操作命令
進入docker容器
docker exec -it 942142604a46 bash
查看節點狀態
./bin/zkServer.sh status
開啟客戶端
./bin/zkCli.sh
創建臨時節點
create -e /node1 node1.1創建臨時節點,當客戶端關閉時候,該節點會隨之刪除。不加參數-e創建永久節點
獲取節點值
get /node
列出節點值
ls /node
刪除節點值
delete /node
查看節點信息
stat /test
先介紹下zk的客戶端框架Curator
簡介
Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊Watcher和NodeExistsException異常等
Curator的maven依賴
介紹下Curator的基本API
- 使用靜態工程方法創建會話
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",5000, 5000, retryPolicy);RetryPolicy為重試策略第一個參數為baseSleepTimeMs初始的sleep時間,用于計算之后的每次重試的sleep時間。第二個參數為maxRetries,最大重試次數
- 使用Fluent風格api創建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(5000) // 會話超時時間.connectionTimeoutMs(5000) // 連接超時時間.retryPolicy(retryPolicy).namespace("base") // 包含隔離名稱.build();client.start();
- 創建數據節點
lient.create().creatingParentContainersIfNeeded() // 遞歸創建所需父節點.withMode(CreateMode.PERSISTENT) // 創建類型為持久節點.forPath("/nodeA", "init".getBytes()); // 目錄及內容
- 刪除數據節點
client.delete().guaranteed() // 強制保證刪除.deletingChildrenIfNeeded() // 遞歸刪除子節點.withVersion(10086) // 指定刪除的版本號.forPath("/nodeA");
- 讀取數據節點
byte[] bytes = client.getData().forPath("/nodeA");System.out.println(new String(bytes));
- 讀stat
Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/nodeA");
- 修改數據節點
client.setData().withVersion(10086) // 指定版本修改.forPath("/nodeA", "data".getBytes());
- 事務
client.inTransaction().check().forPath("/nodeA").and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes()).and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes()).and().commit();
- 其他
client.checkExists() // 檢查是否存在.forPath("/nodeA");client.getChildren().forPath("/nodeA"); // 獲取子節點的路徑
- 異步回調
Executor executor = Executors.newFixedThreadPool(2);client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((curatorFramework, curatorEvent) -> {System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));},executor).forPath("path");
zk分布式實現的代碼分析
先說下這個test方法 描述了 獲取zk鎖的完整流程
再說下 如何通過訪問接口的方式的實現
目錄結構
初始化zk客戶端連接
zk 客戶端申請、釋放鎖實現
- 實現了 InitializingBean, DisposableBean接口
應用在啟動的時候(client.start方法執行的時候)zookeeper客戶端就會和zookeeper服務器時間建立會話,系統關閉時,客戶端與zookeeper服務器的會話就關閉了
定義一個抽象的業務處理接口
單個線程獲取zk鎖
多個線程獲取zk鎖
創建線程池ExecutorService executorService = Executors.newFixedThreadPool(20);20個線程同時發起對同一個zk鎖的獲取申請
Curator 源碼分析
會話的建立與關閉
在client.start調用后,就會創建與zookeeper服務器之間的會話鏈接
系統關閉時 會話就會斷開
- client.start 源碼分析
- 啟動日志
- 關閉日志
- 系統啟動時zk的日志
- 系統關閉時zk的日志
- 訪問多線程獲取zk鎖接口
curl http://127.0.0.1:8080/batch-acquire-lock查看zk鎖情況查看日志
20個線程同時獲取鎖 會在/lock-path下面創建20個臨時節點 序號從0-19 只有創建序號0的臨時節點的那個線程才會成功獲取得鎖 其他的沒有獲取鎖的臨時節點會刪除
此時那個獲得zk鎖的線程如果使用鎖完畢之后如果不釋放鎖 這個鎖對應的臨時節點還會存在
由此也會看出一個缺點臨時會話順序節點會被刪除,但是它們的父節點/lock-path不會被刪除。因此,高并發的業務場景下使用zookeeper分布式鎖時,會留下很多的空節點節點創建
跟蹤lock.acquire(200, TimeUnit.MILLISECONDS)進入到
org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver#createsTheLock
創建的節點為臨時會話順序節點(EPHEMERAL_SEQUENTIAL)
即該節點會在客戶端鏈接斷開時被刪除,還有,我們調用org.apache.curator.framework.recipes.locks.InterProcessMutex#release時也會刪除該節點
可重入性
跟蹤獲取鎖的代碼進入到org.apache.curator.framework.recipes.locks.InterProcessMutex#internalLock
可以看見zookeeper的鎖是可重入的,即同一個線程可以多次獲取鎖,只有第一次真正的去創建臨時會話順序節點,后面的獲取鎖都是對重入次數加1。相應的,在釋放鎖的時候,前面都是對鎖的重入次數減1,只有最后一次才是真正的去刪除節點
客戶端故障檢測:
正常情況下,客戶端會在會話的有效期內,向服務器端發送PING 請求,來進行心跳檢查,說明自己還是存活的。服務器端接收到客戶端的請求后,會進行對應的客戶端的會話激活,會話激活就會延長該會話的存活期。如果有會話一直沒有激活,那么說明該客戶端出問題了,服務器端的會話超時檢測任務就會檢查出那些一直沒有被激活的與客戶端的會話,然后進行清理,清理中有一步就是刪除臨時會話節點(包括臨時會話順序節點)。這就保證了zookeeper分布鎖的容錯性,不會因為客戶端的意外退出,導致鎖一直不釋放,其他客戶端獲取不到鎖。數據一致性:
zookeeper服務器集群一般由一個leader節點和其他的follower節點組成,數據的讀寫都是在leader節點上進行。當一個寫請求過來時,leader節點會發起一個proposal,待大多數follower節點都返回ack之后,再發起commit,待大多數follower節點都對這個proposal進行commit了,leader才會對客戶端返回請求成功;如果之后leader掛掉了,那么由于zookeeper集群的leader選舉算法采用zab協議保證數據最新的follower節點當選為新的leader,所以,新的leader節點上都會有原來leader節點上提交的所有數據。這樣就保證了客戶端請求數據的一致性了。CAP:
任何分布式架構都不能同時滿足C(一致性)、A(可用性)、P(分區耐受性),因此,zookeeper集群在保證一致性的同時,在A和P之間做了取舍,最終選擇了P,因此可用性差一點。
綜上所述
zookeeper分布式鎖保證了鎖的容錯性、一致性。但是會產生空閑節點(/lock-path),并且有些時候不可用。
源碼
https://gitee.com/pingfanrenbiji/distributed-lock/tree/master/zookeeper
引用文章
https://my.oschina.net/yangjianzhou/blog/1930493
https://www.jianshu.com/p/db65b64f38aa
總結
以上是生活随笔為你收集整理的api 创建zookeeper客户端_zookeeper分布式锁原理及实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 希腊神话作者是谁啊?
- 下一篇: pythontype函数使用_基础教程: