大数据之Zookeeper
文章目錄
- 1. Zookeeper 入門
- 1.1 概述
- 1.2 特點(diǎn)
- 1.3 數(shù)據(jù)結(jié)構(gòu)
- 1.4 應(yīng)用場(chǎng)景
- 2. Zookeeper 安裝
- 2.1 下載地址
- 2.2 本地模式安裝部署
- 2.3 分布式安裝部署
- 2.4 配置參數(shù)解讀
- 3. Zookeeper 內(nèi)部原理
- 3.1 選舉機(jī)制
- 3.2 節(jié)點(diǎn)類型
- 3.3 Stat 結(jié)構(gòu)體
- 3.4 監(jiān)聽器原理
- 3.5 寫數(shù)據(jù)流程
- 4. Zookeeper 實(shí)戰(zhàn)
- 4.1 客戶端命令行操作
- 4.2 API 操作
- 4.3.1 IDEA 環(huán)境搭建
- 4.3.2 創(chuàng)建 ZooKeeper 客戶端
- 4.3.4 獲取子節(jié)點(diǎn)并監(jiān)聽節(jié)點(diǎn)變化
- 4.3.5 判斷 Znode 是否存在
- 4.3 監(jiān)聽服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線案例
- 5 zookeeper框架
- 5.1 org.apache.zookeeper
- 5.2 zkclient
- 5.2.1 簡(jiǎn)介
- 5.2.2 Maven依賴
- 5.2.3 ZkClient 的設(shè)計(jì)
- 5.2.4 重要處理流程說明
- 5.2.5 客戶端處理變更(Watcher通知)
- 5.2.6 序列化處理
- 5.2.7 ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?
- 5.2.8 API介紹
- 5.2.9 demo
- 5.3 Curator
- 5.3.1 簡(jiǎn)介
- 5.3.2 版本問題
- 5.3.3 CuratorFramework
- 5.3.4 curator-recipes
- 5.3.5 知識(shí)點(diǎn)
- 5.3.6 Maven依賴
- 5.3.7 api
- 5.3.8 使用Curator高級(jí)API特性之Cache緩存監(jiān)控節(jié)點(diǎn)變化
- 5.4 使用Curator創(chuàng)建/驗(yàn)證ACL(訪問權(quán)限列表)
- 5.4.1 連通Zk時(shí),就指定登錄權(quán)限
- 5.4.2寫一個(gè)把明文的賬號(hào)密碼轉(zhuǎn)換為加密后的密文的工具類
- 5.4.3使用自定義工具類AclUtils,一次性給多個(gè)用戶賦Acl權(quán)限
- 5.4.4級(jí)聯(lián)創(chuàng)建節(jié)點(diǎn),并賦予節(jié)點(diǎn)操作權(quán)限
- 5.4.5讀取節(jié)點(diǎn)數(shù)據(jù)
- 5.4.6修改具有ACL權(quán)限節(jié)點(diǎn)的data數(shù)據(jù)
- 5.4.7兩種方法判斷node節(jié)點(diǎn)是否存(優(yōu)先使用第一種)
- 7 分布式鎖
- 7.1.重入式排它鎖InterProcessMutex
- 7.2.不可重入排它鎖InterProcessSemaphoreMutex
- 7.3.可重入讀寫鎖InterProcessReadWriteLock 、InterProcessLock
- 7.4.多鎖對(duì)象容器(多共享鎖) ,將多個(gè)鎖作為單個(gè)實(shí)體管理,InterProcessMultiLock、InterProcessLock
- 7.5.代碼
- 8.分布式計(jì)數(shù)器
1. Zookeeper 入門
1.1 概述
Zookeeper 是一個(gè)開源的分布式的,為分布式應(yīng)用提供協(xié)調(diào)服務(wù)的 Apache 項(xiàng)目。
??Zookeeper 從設(shè)計(jì)模式角度來理解:是一個(gè)基于觀案者模式設(shè)計(jì)的分布式服務(wù)管理框架,它負(fù)責(zé)存儲(chǔ)和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊(cè),一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper 就將負(fù)責(zé)通知已經(jīng)在 Zookeeper 上注冊(cè)的那些觀察者做出相應(yīng)的反應(yīng)。
??
1.2 特點(diǎn)
1)Zookeeper:一個(gè)領(lǐng)導(dǎo)者(Leader) ,多個(gè)跟隨者(Follower)組成的集群。
2)集群中只要有半數(shù)以上節(jié)點(diǎn)存活,Zookeeper 集群就能正常服務(wù)。
3)全局?jǐn)?shù)據(jù)一致:每個(gè) Server 保存一份相同的數(shù)據(jù)副本,Client 無論連接到哪個(gè) Server,數(shù)據(jù)都是一致的。
4)更新請(qǐng)求順序進(jìn)行,來自同一個(gè) Client 的更新請(qǐng)求按其發(fā)送順序依次執(zhí)行。
5)數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗。
6)實(shí)時(shí)性,在一定時(shí)間范圍內(nèi),Client 能讀到最新數(shù)據(jù)。
1.3 數(shù)據(jù)結(jié)構(gòu)
ZooKeeper 數(shù)據(jù)模型的結(jié)構(gòu)與 Unix 文件系統(tǒng)很類似,整體上可以看作是一棵樹,每個(gè)節(jié)點(diǎn)稱做一個(gè) ZNode。每一個(gè) ZNode 默認(rèn)能夠存儲(chǔ) 1 MB 的數(shù)據(jù),每個(gè) ZNode 都可以通過其路徑唯一標(biāo)識(shí)。
1.4 應(yīng)用場(chǎng)景
提供的服務(wù)包括:統(tǒng)一命名服務(wù)、統(tǒng)一配置管理、統(tǒng)一集群管理、服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線、軟負(fù)載均衡等。
統(tǒng)一命名服務(wù)
在分布式環(huán)境下,經(jīng)常需要對(duì)應(yīng)用/服務(wù)進(jìn)行統(tǒng)一命名 ,便于識(shí)別。例如:IP 不容易記住,而域名容易記住。
統(tǒng)一配置管理
(1)分布式環(huán)境下,配置文件同步非常常見。
??① 一般要求一個(gè)集群中,所有節(jié)點(diǎn)的配置信息是一致的,比如 Kafka 集群。
??② 對(duì)配置文件修改后,希望能夠快速同步到各個(gè)節(jié)點(diǎn)上。
(2)配置管理可交由 ZooKeeper 實(shí)現(xiàn)。
??① 可將配置信息寫入 ZooKeeper 上的一個(gè) Znode 。
??② 各個(gè)客戶端服務(wù)器監(jiān)聽這個(gè) Znode。
??③ 一旦 Znode 中的數(shù)據(jù)被修改,ZooKeeper 將通知各個(gè)客戶端服務(wù)器。
統(tǒng)一集群管理
(1)分布式環(huán)境中,實(shí)時(shí)掌握每個(gè)節(jié)點(diǎn)的狀態(tài)是必要的。
??可根據(jù)節(jié)點(diǎn)實(shí)時(shí)狀態(tài)做出一些調(diào)整。
(2)ZooKeeper 可以實(shí)現(xiàn)實(shí)時(shí)監(jiān)控節(jié)點(diǎn)狀態(tài)變化
??① 可將節(jié)點(diǎn)信息寫入Z ooKeeper 上的一個(gè) ZNode。
??② 監(jiān)聽這個(gè) ZNode 可獲取它的實(shí)時(shí)狀態(tài)變化。
服務(wù)器動(dòng)態(tài)上下線
軟負(fù)載均衡
在 Zookeeper 中記錄每臺(tái)服務(wù)器的訪問數(shù),讓訪問數(shù)最少的服務(wù)器去處理最新的客戶端請(qǐng)求。
2. Zookeeper 安裝
2.1 下載地址
zookeeper 官網(wǎng)
2.2 本地模式安裝部署
準(zhǔn)備工作
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/ mv apache-zookeeper-3.5.6-bin/ zookeeper mv zoo_sample.cfg zoo.cfg mkdir -p /usr/local/zookeeper/datavim zoo.cfg dataDir=/usr/local/zookeeper/datavim /etc/profile
在配置文件中添加以下內(nèi)容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
source /etc/profile
啟動(dòng) Zookeeper
zkServer.sh start
啟動(dòng)客戶端
zkCli.sh
退出客戶端
quit
停止 Zookeeper
zkServer.sh stop
2.3 分布式安裝部署
集群規(guī)劃
在 master、slave1 和 slave2 三個(gè)節(jié)點(diǎn)上部署 Zookeeper。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local
mv apache-zookeeper-3.5.6-bin/ zookeeper
同步 /usr/local/zookeeper 目錄內(nèi)容到 slave1、slave2
xsync zookeeper/
配置服務(wù)器編號(hào)
① 在 /usr/local/zookeeper/ 這個(gè)目錄下創(chuàng)建 zkData
mkdir data
② /usr/local/zookeeper/data 目錄下創(chuàng)建一個(gè) myid 的文件
touch myid
③ 編輯 myid 文件
vim myid
在文件中添加與 server 對(duì)應(yīng)的編號(hào):
0
④ 分發(fā)到其他機(jī)器上
xsync myid
并分別在 slave1、slave2 上修改 myid 文件中內(nèi)容為 1、2
配置 zoo.cfg 文件
① 將 /usr/local/zookeeper/conf 這個(gè)路徑下的 zoo_sample.cfg 修改為 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打開 zoo.cfg 文件,修改 dataDir 路徑
dataDir=/usr/local/zookeeper/data
增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
同步 zoo.cfg 配置文件
xsync zoo.cfg
修改環(huán)境變量
① 打開配置文件
vim /etc/profile
② 在配置文件中添加以下內(nèi)容
#ZOOKEEPER
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三臺(tái)機(jī)器)
source /etc/profile
集群操作
① 三臺(tái)機(jī)器分別啟動(dòng) Zookeeper
zkServer.sh start
② 三臺(tái)機(jī)器分別關(guān)閉 Zookeeper
zkServer.sh stop
編寫 Zookeeper 的群起群關(guān)腳本
① 在 /usr/local/bin 目錄下創(chuàng)建 zk 文件
vim zk.sh
修改腳本 zk 具有執(zhí)行權(quán)限
chmod 777 zk.sh
調(diào)用腳本形式:zk start 或 zk stop
2.4 配置參數(shù)解讀
Zookeeper 中的配置文件 zoo.cfg 中參數(shù)含義解讀如下:
tickTime =2000:通信心跳數(shù),Zookeeper 服務(wù)器與客戶端心跳時(shí)間,單位毫秒
Zookeeper 使用的基本時(shí)間,服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔,也就是每個(gè)tickTime 時(shí)間就會(huì)發(fā)送一個(gè)心跳,時(shí)間單位為毫秒。它用于心跳機(jī)制,并且設(shè)置最小的 session 超時(shí)時(shí)間為兩倍心跳時(shí)間。(session 的最小超時(shí)時(shí)間是 2*tickTime)
initLimit =10:LF 初始通信時(shí)限
集群中的 Follower 跟隨者服務(wù)器與 Leader 領(lǐng)導(dǎo)者服務(wù)器之間初始連接時(shí)能容忍的最多心跳數(shù)(tickTime的數(shù)量),用它來限定集群中的 Zookeeper 服務(wù)器連接到 Leader 的時(shí)限。
syncLimit =5:LF 同步通信時(shí)限
集群中 Leader 與 Follower 之間的最大響應(yīng)時(shí)間單位,假如響應(yīng)超過 syncLimit * tickTime,Leader 認(rèn)為 Follwer 死掉,從服務(wù)器列表中刪除 Follwer。
dataDir:數(shù)據(jù)文件目錄+數(shù)據(jù)持久化路徑
主要用于保存 Zookeeper 中的數(shù)據(jù)。
clientPort =2181:客戶端連接端口
監(jiān)聽客戶端連接的端口。
server.A=B:C:D
A 是一個(gè)數(shù)字,表示這個(gè)是第幾號(hào)服務(wù)器;集群模式下配置一個(gè)文件 myid,這個(gè)文件在 dataDir 目錄下,這個(gè)文件里面有一個(gè)數(shù)據(jù)就是 A 的值,Zookeeper 啟動(dòng)時(shí)讀取此文件,拿到里面的數(shù)據(jù)與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個(gè)server。
B 是這個(gè)服務(wù)器的 ip 地址;
C 是這個(gè)服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口;
D 是萬一集群中的 Leader 服務(wù)器掛了,需要一個(gè)端口來重新進(jìn)行選舉,選出一個(gè)新的 Leader,而這個(gè)端口就是用來執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。
3. Zookeeper 內(nèi)部原理
3.1 選舉機(jī)制
半數(shù)機(jī)制
集群中半數(shù)以上機(jī)器存活,集群可用。所以 Zookeeper 適合安裝奇數(shù)臺(tái)服務(wù)器。
Zookeeper 雖然在配置文件中并沒有指定 Master 和 Slave。但是,Zookeeper 工作時(shí),是有一個(gè)節(jié)點(diǎn)為 Leader,其他則為 Follower,Leader 是通過內(nèi)部的選舉機(jī)制臨時(shí)產(chǎn)生的。
選舉過程例子
假設(shè)有五臺(tái)服務(wù)器組成的 Zookeeper 集群,它們的 id 從1-5,同時(shí)它們都是最新啟動(dòng)的,也就是沒有歷史數(shù)據(jù),在存放數(shù)據(jù)量這一點(diǎn)上,都是一樣的。假設(shè)這些服務(wù)器依序啟動(dòng)。
① 服務(wù)器 1 啟動(dòng),此時(shí)只有它一臺(tái)服務(wù)器啟動(dòng)了,它發(fā)出去的報(bào)文沒有任何響應(yīng),所以它的選舉狀態(tài)一直是 LOOKING 狀態(tài)。
② 服務(wù)器 2 啟動(dòng),它與最開始啟動(dòng)的服務(wù)器 1 進(jìn)行通信,互相交換自己的選舉結(jié)果,由于兩者都沒有歷史數(shù)據(jù),所以 id 值較大的服務(wù)器 2 勝出,但是由于沒有達(dá)到超過半數(shù)以上的服務(wù)器都同意選舉它(這個(gè)例子中的半數(shù)以上是 3),所以服務(wù)器 1、2 還是繼續(xù)保持 LOOKING 狀態(tài)。
③ 服務(wù)器 3 啟動(dòng),根據(jù)前面的理論分析,服務(wù)器 3 成為服務(wù)器 1、2、3 中的老大,而與上面不同的是,此時(shí)有三臺(tái)服務(wù)器選舉了它,所以它成為了這次選舉的 Leader。
④ 服務(wù)器 4 啟動(dòng),根據(jù)前面的分析,理論上服務(wù)器4應(yīng)該是服務(wù)器 1、2、3、4 中最大的,但是由于前面已經(jīng)有半數(shù)以上的服務(wù)器選舉了服務(wù)器 3,所以它只能接收當(dāng)小弟的命了。
⑤ 服務(wù)器 5 啟動(dòng),同 4 一樣當(dāng)小弟。
3.2 節(jié)點(diǎn)類型
持久(Persistent)
客戶端和服務(wù)器端斷開連接后,創(chuàng)建的節(jié)點(diǎn)不刪除
短暫(Ephemeral)
客戶端和服務(wù)器端斷開連接后,創(chuàng)建的節(jié)點(diǎn)自己刪除
節(jié)點(diǎn)類型
① 持久化目錄節(jié)點(diǎn)
客戶端與 Zookeeper 斷開連接后,該節(jié)點(diǎn)依舊存在。
② 持久化順序編號(hào)目錄節(jié)點(diǎn)
客戶端與 Zookeeper 斷開連接后,該節(jié)點(diǎn)依舊存在,只是 Zookeeper 給該節(jié)點(diǎn)名稱進(jìn)行順序編號(hào)
③ 臨時(shí)目錄節(jié)點(diǎn)
客戶端與 Zookeeper 斷開連接后,該節(jié)點(diǎn)被刪除
④ 臨時(shí)順序編號(hào)目錄節(jié)點(diǎn)
客戶端與 Zookeeper 斷開連接后,該節(jié)點(diǎn)被刪除,只是 Zookeeper 給該節(jié)點(diǎn)名稱進(jìn)行順序編號(hào)。
說明: 創(chuàng)建 znode 時(shí)設(shè)置順序標(biāo)識(shí),znode 名稱后會(huì)附加一個(gè)值,順序號(hào)是一個(gè)單調(diào)遞增的計(jì)數(shù)器,由父節(jié)點(diǎn)維護(hù)。
注意: 在分布式系統(tǒng)中,順序號(hào)可以被用于為所有的事件進(jìn)行全局排序,這樣客戶端可以通過順序號(hào)推斷事件的順序。
3.3 Stat 結(jié)構(gòu)體
czxid: 創(chuàng)建節(jié)點(diǎn)的事務(wù) zxid
每次修改 ZooKeeper 狀態(tài)都會(huì)收到一個(gè) zxid 形式的時(shí)間戳,也就是 ZooKeepe r事務(wù) ID。
事務(wù) ID 是 ZooKeeper 中所有修改總的次序。每個(gè)修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前發(fā)生。
ctime: znode 被創(chuàng)建的毫秒數(shù)(從 1970 年開始)
mzxid: znode 最后更新的事務(wù) zxid
mtime: znode 最后修改的毫秒數(shù)(從 1970 年開始)
pZxid: znode 最后更新的子節(jié)點(diǎn) zxid
cversion : znode 子節(jié)點(diǎn)變化號(hào),znode 子節(jié)點(diǎn)修改次數(shù)
dataversion: znode 數(shù)據(jù)變化號(hào)
aclVersion: znode 訪問控制列表的變化號(hào)
ephemeralOwner: 如果是臨時(shí)節(jié)點(diǎn),這個(gè)是 znode 擁有者的 session id。如果不是臨時(shí)節(jié)點(diǎn)則是 0。
dataLength: znode 的數(shù)據(jù)長(zhǎng)度
numChildren: znode 子節(jié)點(diǎn)數(shù)量
3.4 監(jiān)聽器原理
監(jiān)聽原理詳解:
① 首先要有一個(gè) main() 線程
② 在 main 線程中創(chuàng)建 Zokeeper 客戶端,這時(shí)就會(huì)創(chuàng)建兩個(gè)線程,一個(gè)負(fù)責(zé)網(wǎng)絡(luò)連接通信(connet),一個(gè)負(fù)責(zé)監(jiān)聽(listener) 。
③ 通過 connect 線程將注冊(cè)的監(jiān)聽事件發(fā)送給 Zookeeper。
④ 在 Zookeeper 的注冊(cè)監(jiān)聽器列表中將注冊(cè)的監(jiān)聽事件添加到列表中。
⑤ Zookeeper 監(jiān)聽到有數(shù)據(jù)或路徑變化,就會(huì)將這個(gè)消息發(fā)送給 listener 線程。
⑥ listener 線程內(nèi)部調(diào)用了 process() 方法。
常見的監(jiān)聽
① 監(jiān)聽節(jié)點(diǎn)數(shù)據(jù)的變化
get -w path
② 監(jiān)聽子節(jié)點(diǎn)增減的變化
ls -w path
3.5 寫數(shù)據(jù)流程
4. Zookeeper 實(shí)戰(zhàn)
4.1 客戶端命令行操作
啟動(dòng)客戶端
zkCli.sh
顯示所有操作命令
help
查看當(dāng)前 znode 中所包含的內(nèi)容
ls /
ls2 /
查看當(dāng)前節(jié)點(diǎn)詳細(xì)數(shù)據(jù)
ls -s /
分別創(chuàng)建 2 個(gè)普通節(jié)點(diǎn)
create /animals “dog”
create /animals/small “ant”
獲得節(jié)點(diǎn)的值
get /animals
get /animals/small
創(chuàng)建短暫節(jié)點(diǎn)
create -e /animals/big “elephant”
創(chuàng)建帶序號(hào)的節(jié)點(diǎn)
create -s /animals/middle “hourse”
修改節(jié)點(diǎn)數(shù)據(jù)值
set /animals/small “bug”
節(jié)點(diǎn)的值變化監(jiān)聽
① 在 slave1 主機(jī)上注冊(cè)監(jiān)聽 /animals 節(jié)點(diǎn)數(shù)據(jù)變化
get -w /animals
② 在 slave2 主機(jī)上修改 /animals 節(jié)點(diǎn)的數(shù)據(jù)
set /animals “cat”
③ 觀察 slave1 主機(jī)收到子節(jié)點(diǎn)變化的監(jiān)聽
節(jié)點(diǎn)的子節(jié)點(diǎn)變化監(jiān)聽(路徑變化)
① 在 slave1 主機(jī)上注冊(cè)監(jiān)聽 /animals 節(jié)點(diǎn)的子節(jié)點(diǎn)變化
ls -w /animals
② 在 slave2 主機(jī) /animals 節(jié)點(diǎn)上創(chuàng)建子節(jié)點(diǎn)
create /animals/mini “fly”
③ 觀察 slave1 主機(jī)收到子節(jié)點(diǎn)變化的監(jiān)聽
刪除節(jié)點(diǎn)
delete /animals/big
遞歸刪除節(jié)點(diǎn)
deleteall /animals/mini
查看節(jié)點(diǎn)狀態(tài)
stat /animals
4.2 API 操作
4.3.1 IDEA 環(huán)境搭建
創(chuàng)建一個(gè) Maven 工程
在 pom 文件中添加依賴
在項(xiàng)目的 src/main/resources 目錄下,新建一個(gè)文件,命名為 “l(fā)og4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4.3.2 創(chuàng)建 ZooKeeper 客戶端
@SpringBootTest
public class ZookeeperTest {
}
4.3.3 創(chuàng)建子節(jié)點(diǎn)
先將上面的 init() 方法前面的注解 @Test 改為 @BeforeAll
// 創(chuàng)建子節(jié)點(diǎn)
@SpringBootTest public class ZookeeperTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {}});}@Testpublic void createNode() throws Exception { // 參數(shù)1:要?jiǎng)?chuàng)建的節(jié)點(diǎn)的路徑; 參數(shù)2:節(jié)點(diǎn)數(shù)據(jù) ; 參數(shù)3:節(jié)點(diǎn)權(quán)限 ;參數(shù)4:節(jié)點(diǎn)的類型String path = zkClient.create("/demo1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(path);} }4.3.4 獲取子節(jié)點(diǎn)并監(jiān)聽節(jié)點(diǎn)變化
// 獲取子節(jié)點(diǎn)并監(jiān)聽節(jié)點(diǎn)變化 @SpringBootTest public class WatchTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@Testpublic void getChildrenAndWatch() throws Exception {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}// 延時(shí)阻塞Thread.sleep(Long.MAX_VALUE);}@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {List<String> children = null;try {children = zkClient.getChildren("/", true);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}for (String child : children) {System.out.println(child);}}});} }4.3.5 判斷 Znode 是否存在
// 判斷znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists(“/animals”, false);
System.out.println(stat == null ? “not exist” : “exist”);
}
4.3 監(jiān)聽服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線案例
需求
某分布式系統(tǒng)中,主節(jié)點(diǎn)可以有多臺(tái),可以動(dòng)態(tài)上下線,任意一臺(tái)客戶端都能實(shí)時(shí)感知到主節(jié)點(diǎn)服務(wù)器的上下線。
需求分析
代碼實(shí)現(xiàn)
① 先在集群上創(chuàng)建 /servers 節(jié)點(diǎn)
create /servers “servers”
② 服務(wù)器端向 Zookeeper 注冊(cè)代碼
package zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient;public static void main(String[] args) throws Exception {args = new String[]{"slave1"};DistributeServer server = new DistributeServer();// 1.連接zookeeper集群server.getConnect();// 2.注冊(cè)節(jié)點(diǎn)server.register(args[0]);// 3.業(yè)務(wù)邏輯處理server.business(); }private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}}); }private void register(String hostname) throws KeeperException, InterruptedException {String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " is online"); }private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); }}
③ 客戶端代碼
package zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient;public static void main(String[] args) throws Exception {DistributeClient client = new DistributeClient();// 1.連接zookeeper集群client.getConnect();// 2.注冊(cè)監(jiān)聽client.getChildren();// 3.業(yè)務(wù)邏輯處理client.business(); }private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {try {getChildren();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}); }private void getChildren() throws KeeperException, InterruptedException {List<String> children = zkClient.getChildren("/servers", true);// 存儲(chǔ)服務(wù)器節(jié)點(diǎn)主機(jī)名稱集合ArrayList<String> hosts = new ArrayList<String>();for (String child : children) {byte[] data = zkClient.getData("/servers/" + child, false, null);hosts.add(new String(data));}System.out.println(hosts); }private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); }}
5 zookeeper框架
5.1 org.apache.zookeeper
5.2 zkclient
5.2.1 簡(jiǎn)介
ZkClient 是由 Datameer 的工程師開發(fā)的開源客戶端,對(duì) Zookeeper 的原生 API 進(jìn)行了包裝,實(shí)現(xiàn)了超時(shí)重連、Watcher 反復(fù)注冊(cè)等功能。
在使用 ZooKeeper 的 Java 客戶端時(shí),經(jīng)常需要處理幾個(gè)問題:重復(fù)注冊(cè) watcher、session失效重連、異常處理。
IZKConnection:是一個(gè)ZkClient與Zookeeper之間的一個(gè)適配器;在代碼里直接使用的是ZKClient,實(shí)質(zhì)上還是委托了zookeeper來處理了。
在ZKClient中,根據(jù)事件類型,分為
節(jié)點(diǎn)事件(數(shù)據(jù)事件),對(duì)應(yīng)的事件處理器是IZKDataListener;
子節(jié)點(diǎn)事件,對(duì)應(yīng)的事件處理器是IZKChildListener;
Session事件,對(duì)應(yīng)的事件處理器是IZKStatusListener;
ZkEventThread:是專門用來處理事件的線程
目前已經(jīng)運(yùn)用到了很多項(xiàng)目中,知名的有 Dubbo、Kafka、Helix。
5.2.2 Maven依賴
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version> </dependency>5.2.3 ZkClient 的設(shè)計(jì)
從上述結(jié)構(gòu)上看,IZKConnection 是一個(gè) ZkClient 與 ZooKeeper 之間的一個(gè)適配器。在代碼里直接使用的是 ZKClient,其實(shí)質(zhì)還是委托了 zookeeper 來處理了。
使用 ZooKeeper 客戶端來注冊(cè) watcher 有幾種方法: 1、創(chuàng)建 ZooKeeper 對(duì)象時(shí)指定默認(rèn)的 Watcher,2、getData(),3、exists(),4、 getchildren。其中 getdata,exists 注冊(cè)的是某個(gè)節(jié)點(diǎn)的事件處理器(watcher),getchildren 注冊(cè)的是子節(jié)點(diǎn)的事件處理器(watcher)。而在 ZKClient 中,根據(jù)事件類型,分為了節(jié)點(diǎn)事件(數(shù)據(jù)事件)、子節(jié)點(diǎn)事件。對(duì)應(yīng)的事件處理器則是 IZKDataListener 和 IZKChildListener。另外加入了 Session 相關(guān)的事件和事件處理器。
ZkEventThread 是專門用來處理事件的線程。
5.2.4 重要處理流程說明
啟動(dòng) ZKClient
在創(chuàng)建 ZKClient 對(duì)象時(shí),就完成了到 ZooKeeper 服務(wù)器連接的建立。具體過程是這樣的:
啟動(dòng)時(shí),指定好 connection string,連接超時(shí)時(shí)間,序列化工具等。
創(chuàng)建并啟動(dòng) eventThread,用于接收事件,并調(diào)度事件監(jiān)聽器 Listener 的執(zhí)行。
連接到 zookeeper 服務(wù)器,同時(shí)將 ZKClient 自身作為默認(rèn)的 Watcher。
為節(jié)點(diǎn)注冊(cè)Watcher:
ZooKeeper 的三個(gè)方法:getData、getChildren、exists.
ZKClient 都提供了相應(yīng)的代理方法。就拿 exists 來看:
可以看到,是否注冊(cè) watcher,由 hasListeners(path)來決定的。
hasListeners 就是看有沒有與該數(shù)據(jù)節(jié)點(diǎn)綁定的 listener。
所以,默認(rèn)情況下,都會(huì)自動(dòng)的為指定的 path 注冊(cè) watcher,并且是默認(rèn)的 watcher (ZKClient)。怎么才能讓 hasListeners 判定值為 true 呢,也就是怎么才能為 path 綁定 Listener 呢?
ZKClient提供了訂閱功能:
一個(gè)新建的會(huì)話,只需要在取得響應(yīng)的數(shù)據(jù)節(jié)點(diǎn)后,調(diào)用 subscribteXxx 就可以訂閱上相應(yīng)的事件了。
5.2.5 客戶端處理變更(Watcher通知)
前面已經(jīng)知道,ZKClient 是默認(rèn)的 Watcher,并且在為各個(gè)數(shù)據(jù)節(jié)點(diǎn)注冊(cè)的 Watcher 都是這個(gè)默認(rèn)的 Watcher。那么該是如何將各種事件通知給相應(yīng)的 Listener 呢?
處理過程大致可以概括為下面的步驟:
判斷變更類型:變更類型分為 State 變更、ChildNode 變更(創(chuàng)建子節(jié)點(diǎn)、刪除子節(jié)點(diǎn)、修改子節(jié)點(diǎn)數(shù)據(jù))、NodeData 變更(創(chuàng)建指定 node,刪除節(jié)點(diǎn),節(jié)點(diǎn)數(shù)據(jù)變更)。
取出與 path 關(guān)聯(lián)的 Listeners,并為每一個(gè) Listener 創(chuàng)建一個(gè) ZKEvent,將 ZkEvent 交給 ZkEventThread 處理。
ZkEventThread 線程,拿到 ZkEvent 后,只需要調(diào)用 ZkEvent 的 run 方法進(jìn)行處理。 從這里也可以知道,具體的怎么如何調(diào)用 Listener,還要依賴于 ZkEvent 的 run()實(shí)現(xiàn)了。
注冊(cè)監(jiān)聽 watcher:
| IZkChildListener(子節(jié)點(diǎn)) | ZkClient的subscribeChildChanges方法 | ZkClient 的unsubscribeChildChanges 方法 |
| IZkDataListener(數(shù)據(jù)) | ZkClient 的subscribeDataChanges 方法 | ZkClient 的 unsubscribeDataChanges 方法 |
| IZkStateListener(客戶端狀 態(tài)) | ZkClient 的 subscribeStateChanges 方 法 | ZkClient 的 unsubscribeStateChanges 方法 |
在 ZkClient 中客戶端可以通過注冊(cè)相關(guān)的事件監(jiān)聽來實(shí)現(xiàn)對(duì) Zookeeper 服務(wù)端時(shí)間的訂閱。
其中 ZkClient 提供的監(jiān)聽事件接口有以下幾種:
其中 ZkClient 還提供了一個(gè) unsubscribeAll 方法,來解除所有監(jiān)聽。
Zookeeper 中提供的變更操作有:節(jié)點(diǎn)的創(chuàng)建、刪除,節(jié)點(diǎn)數(shù)據(jù)的修改:
創(chuàng)建操作,數(shù)據(jù)節(jié)點(diǎn)分為四種,ZKClient 分別為他們提供了相應(yīng)的代理:
刪除節(jié)點(diǎn)的操作:
修改節(jié)點(diǎn)數(shù)據(jù)的操作:
writeDataReturnStat():寫數(shù)據(jù)并返回?cái)?shù)據(jù)的狀態(tài)。
updateDataSerialized():修改已序列化的數(shù)據(jù)。執(zhí)行過程是:先讀取數(shù)據(jù),然后使用DataUpdater 對(duì)數(shù)據(jù)修改,最后調(diào)用 writeData 將修改后的數(shù)據(jù)發(fā)送給服務(wù)端。
5.2.6 序列化處理
ZooKeeper 中,會(huì)涉及到序列化、反序列化的操作有兩種:getData、setData。在 ZKClient 中,分別用 readData、writeData 來替代了。
對(duì)于 readData:先調(diào)用 zookeeper 的 getData,然后進(jìn)行使用 ZKSerializer 進(jìn)行反序列化工 作。
對(duì)于 writeData:先使用 ZKSerializer 將對(duì)象序列化后,再調(diào)用 zookeeper 的 setData。
5.2.7 ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?
Watcher 自動(dòng)重注冊(cè):這個(gè)要是依賴于 hasListeners()的判斷,來決定是否再次注冊(cè)。如果對(duì)此有不清晰的,可以看上面的流程處理的說明。
Session 失效重連:如果發(fā)現(xiàn)會(huì)話過期,就先關(guān)閉已有連接,再重新建立連接。
異常處理:對(duì)比 ZooKeeper 和 ZKClient,就可以發(fā)現(xiàn) ZooKeeper 的所有操作都是拋異常 的,而 ZKClient 的所有操作,都不會(huì)拋異常的。在發(fā)生異常時(shí),它或做日志,或返回空, 或做相應(yīng)的 Listener 調(diào)用。
相比于 ZooKeeper 官方客戶端,使用 ZKClient 時(shí),只需要關(guān)注實(shí)際的 Listener 實(shí)現(xiàn)即可。所 以這個(gè)客戶端,還是推薦大家使用的。
https://www.cnblogs.com/jinchengll/p/12333213.html
5.2.8 API介紹
啟動(dòng)ZKClient:在創(chuàng)建ZKClient對(duì)象時(shí),就完成了到ZooKeeper服務(wù)器連接的建立
1、啟動(dòng)時(shí),制定好connection string,連接超時(shí)時(shí)間,序列化工具等
2、創(chuàng)建并啟動(dòng)eventThread,用于接收事件,并調(diào)度事件監(jiān)聽器Listener的執(zhí)行
3、連接到Zookeeper服務(wù)器,同時(shí)將ZKClient自身作為默認(rèn)的Watcher
為節(jié)點(diǎn)注冊(cè)Watcher
Zookeeper 原始API的三個(gè)方法:getData,getChildren、exists,ZKClient都提供了相應(yīng)的代理方法,比如exists,
hasListeners是看有沒有與該數(shù)據(jù)節(jié)點(diǎn)綁定的listener
所以,默認(rèn)情況下,都會(huì)自動(dòng)的為指定的path注冊(cè)watcher,并且是默認(rèn)的watcher(ZKClient),那么怎樣才能讓hasListeners值為true呢,也就是怎么才能為path綁定Listener呢?
ZKClient提供了訂閱功能,一個(gè)新建的會(huì)話,只需要在取得響應(yīng)的數(shù)據(jù)節(jié)點(diǎn)后,調(diào)用subscribeXXX就可以訂閱上相應(yīng)的事件了。
5.2.9 demo
Watcher
public class ZkClientWatcher {ZkClient zkClient;public ZkClientWatcher() {zkClient = new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);}public void createPersistent(String path, Object data) {zkClient.createPersistent(path, data);}public void writeData(String path, Object object) {zkClient.writeData(path, object);}public void delete(String path) {zkClient.delete(path);}public boolean exists(String path) {return zkClient.exists(path);}public void deleteRecursive(String path) {zkClient.deleteRecursive(path);}//對(duì)父節(jié)點(diǎn)添加監(jiān)聽數(shù)據(jù)變化。public void subscribe(String path) {zkClient.subscribeDataChanges(path, new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {System.out.printf("變更的節(jié)點(diǎn)為:%s,數(shù)據(jù):%s\r\n", dataPath, data);}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {System.out.printf("刪除的節(jié)點(diǎn)為:%s\r\n", dataPath);}});}//對(duì)父節(jié)點(diǎn)添加監(jiān)聽子節(jié)點(diǎn)變化。public void subscribe2(String path) {zkClient.subscribeChildChanges(path, new IZkChildListener() {@Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("父節(jié)點(diǎn): " + parentPath + ",子節(jié)點(diǎn):" + currentChilds + "\r\n");}});}//客戶端狀態(tài)public void subscribe3(String path) {zkClient.subscribeStateChanges(new IZkStateListener() {@Overridepublic void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {if (state == Watcher.Event.KeeperState.SyncConnected) {//當(dāng)我重新啟動(dòng)后start,監(jiān)聽觸發(fā)System.out.println("連接成功");} else if (state == Watcher.Event.KeeperState.Disconnected) {System.out.println("連接斷開");//當(dāng)我在服務(wù)端將zk服務(wù)stop時(shí),監(jiān)聽觸發(fā)} elseSystem.out.println("其他狀態(tài)" + state);}@Overridepublic void handleNewSession() throws Exception {System.out.println("重建session");}@Overridepublic void handleSessionEstablishmentError(Throwable error) throws Exception {}});}/* @Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {}*/ } public class ZkClientWatcherTest {public static void main(String[] args) throws InterruptedException {ZkClientWatcher zkClientWatche=new ZkClientWatcher();String path="/root";zkClientWatche.deleteRecursive(path);zkClientWatche.createPersistent(path,"hello");zkClientWatche.subscribe(path);zkClientWatche.subscribe2(path);// zkClientWatche.subscribe3(path);//需要啟服務(wù)// Thread.sleep(Integer.MAX_VALUE);zkClientWatche.createPersistent(path+"/root2","word");TimeUnit.SECONDS.sleep(1);zkClientWatche.writeData(path,"hi");TimeUnit.SECONDS.sleep(1);//zkClientWatche.delete(path);//如果目錄下有內(nèi)容 不能刪除 會(huì)報(bào) Directory not empty for /root的異常zkClientWatche.deleteRecursive(path);TimeUnit.SECONDS.sleep(1); //這個(gè)main線程就結(jié)束} } public class ZookeeperUtil {/** zookeeper服務(wù)器地址 */ // public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";public static final String connectString = "localhost:2181";/** 定義session失效時(shí)間 */public static final int sessionTimeout = 5000;public static final String path = "/root"; }5.3 Curator
5.3.1 簡(jiǎn)介
zookeeper不是為高可用性設(shè)計(jì)的,但它使用ZAB協(xié)議達(dá)到了極高的一致性。所以它經(jīng)常被選作注冊(cè)中心、配置中心、分布式鎖等場(chǎng)景。
它的性能是非常有限的,而且API并不是那么好用。xjjdog傾向于使用基于Raft協(xié)議的Etcd或者Consul,它們更加輕量級(jí)一些。
Curator是netflix公司開源的一套zookeeper客戶端,目前是Apache的頂級(jí)項(xiàng)目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡(jiǎn)化了Zookeeper客戶端的開發(fā)量。Curator解決了很多zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊(cè)wathcer和NodeExistsException 異常等。
Zookeeper 原生API問題:
1.超時(shí)重連,不支持自動(dòng),需要手動(dòng)操作
2.Watch注冊(cè)一次后會(huì)失效
3.不支持遞歸創(chuàng)建節(jié)點(diǎn)
Zookeeper API 升級(jí)版 Curator:
1.解決watcher的注冊(cè)一次就失效
2.提供更多解決方案并且實(shí)現(xiàn)簡(jiǎn)單
3.提供常用的ZooKeeper工具類
4.編程風(fēng)格更爽,點(diǎn)點(diǎn)點(diǎn)就可以了
5.可以遞歸創(chuàng)建節(jié)點(diǎn)等
Curator由一系列的模塊構(gòu)成,對(duì)于一般開發(fā)者而言,常用的是curator-framework和curator-recipes。
5.3.2 版本問題
Curator2.x.x版本兼容Zookeeper的3.4.x和3.5.x。
Curator3.x.x只兼容Zookeeper 3.5.x,并且提供了一些諸如動(dòng)態(tài)重新配置、watch刪除等新特性。
Curator4 統(tǒng)一對(duì) ZooKeeper 3.4.x 和 3.5.x 的支持
5.3.3 CuratorFramework
Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自動(dòng)連接管理:
當(dāng)ZooKeeper客戶端內(nèi)部出現(xiàn)異常, 將自動(dòng)進(jìn)行重連或重試, 該過程對(duì)外幾乎完全透明
監(jiān)控節(jié)點(diǎn)數(shù)據(jù)變化事件NodeDataChanged,需要時(shí)調(diào)用updateServerList()方法
Curator recipes自動(dòng)移除監(jiān)控
更加清晰的API
簡(jiǎn)化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes實(shí)現(xiàn) : 選舉,共享鎖, 路徑cache, 分布式隊(duì)列,分布式優(yōu)先隊(duì)列等。
5.3.4 curator-recipes
curator-recipes:封裝了一些高級(jí)特性,如:Cache事件監(jiān)聽、 Elections選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier、Queues隊(duì)列等
5.3.5 知識(shí)點(diǎn)
1.使用curator建立與zk的連接
2.使用curator添加/遞歸添加節(jié)點(diǎn)
3.使用curator刪除/遞歸刪除節(jié)點(diǎn)
4.使用curator創(chuàng)建/驗(yàn)證 ACL(訪問權(quán)限列表)
5.使用curator監(jiān)聽 單個(gè)/父 節(jié)點(diǎn)的變化(watch事件)
6.基于curator實(shí)現(xiàn)Zookeeper分布式鎖(需要掌握基本的多線程知識(shí))
7.基于curator實(shí)現(xiàn)分布式計(jì)數(shù)器
5.3.6 Maven依賴
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><!--建議和本地安裝版本保持一致--><version>3.7.0</version> </dependency> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version> </dependency> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version> </dependency>5.3.7 api
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class ZkConnectCuratorUtil {final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class);public CuratorFramework zkClient = null; //zk的客戶端工具Curator(在本類通過new實(shí)例化的是,自動(dòng)start)private static final int MAX_RETRY_TIMES = 3; //定義失敗重試次數(shù)private static final int BASE_SLEEP_TIME_MS = 5000; //連接失敗后,再次重試的間隔時(shí)間 單位:毫秒private static final int SESSION_TIME_OUT = 1000000; //會(huì)話存活時(shí)間,根據(jù)業(yè)務(wù)靈活指定 單位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";//Zookeeper服務(wù)所在的IP和客戶端端口private static final String NAMESPACE = "workspace";//指定后,默認(rèn)操作的所有的節(jié)點(diǎn)都會(huì)在該工作空間下進(jìn)行//本類通過new ZkCuratorUtil()時(shí),自動(dòng)連通zkClientpublic ZkConnectCuratorUtil() {RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次連接失敗后,重試策略zkClient = CuratorFrameworkFactory.builder()//.authorization("digest", "root:root".getBytes())//登錄超級(jí)管理(需單獨(dú)配).connectString(ZK_SERVER_IP_PORT).sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy).namespace(NAMESPACE).build();zkClient.start();}public void closeZKClient() {if (zkClient != null) {this.zkClient.close();}}public static void main(String[] args) {ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();boolean ifStarted=zkUtil.zkClient.isStarted();System.out.println("當(dāng)前客戶的狀態(tài):" + (ifStarted ? "連接中" : "已關(guān)閉"));zkUtil.closeZKClient();boolean ifClose = zkUtil.zkClient.isStarted();System.out.println("當(dāng)前客戶的狀態(tài):" + (ifClose ? "連接成功" : "已關(guān)閉"));} } public class CuratorDao {//使用curator(遞歸)添加節(jié)點(diǎn)//級(jí)聯(lián)創(chuàng)建節(jié)點(diǎn)(原生API不支持/后臺(tái)客戶端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//創(chuàng)建父節(jié)點(diǎn),如果需要的話.withMode(CreateMode.PERSISTENT) //指定節(jié)點(diǎn)是臨時(shí)的,還是永久的.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定節(jié)點(diǎn)的操作權(quán)限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath + "節(jié)點(diǎn)已成功創(chuàng)建…");}//使用curator(遞歸)刪除節(jié)點(diǎn)//刪除node節(jié)點(diǎn)及其子節(jié)點(diǎn)public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.delete().guaranteed() //保證刪除:如果刪除失敗,那么在后端還是繼續(xù)會(huì)刪除,直到成功.deletingChildrenIfNeeded() //級(jí)聯(lián)刪除子節(jié)點(diǎn)//.withVersion(1)//版本號(hào)可以據(jù)需使用.forPath(nodePath);System.out.println(nodePath + "節(jié)點(diǎn)已刪除成功…");}//使用curator更新節(jié)點(diǎn)數(shù)據(jù)//更新節(jié)點(diǎn)data數(shù)據(jù)public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本號(hào)據(jù)需使用,默認(rèn)可以不帶System.out.println(nodePath + "節(jié)點(diǎn)數(shù)據(jù)已修改成功…");}//使用curator查詢節(jié)點(diǎn)數(shù)據(jù)//查詢node節(jié)點(diǎn)數(shù)據(jù)public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = new Stat();byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("節(jié)點(diǎn)" + nodePath + "的數(shù)據(jù)為" + new String(data));System.out.println("節(jié)點(diǎn)的版本號(hào)為:" + stat.getVersion());}//使用curator查詢節(jié)點(diǎn)的子節(jié)點(diǎn)//打印node子節(jié)點(diǎn)public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);System.out.println("開始打印子節(jié)點(diǎn)");for (String str : childNodes) {System.out.println(str);}}//使用curator判斷節(jié)點(diǎn)是否存在//判斷node節(jié)點(diǎn)是否存在public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = zkClient.checkExists().forPath(nodePath);System.out.println(null == stat ? "節(jié)點(diǎn)不存在" : "節(jié)點(diǎn)存在");}/**************使用Curator高級(jí)API特性之Cache緩存監(jiān)控節(jié)點(diǎn)變化*************/@Testpublic void test() throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui"); // CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test"); // CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi"); // CuratorDao.getNodeData(zkClient,"/xiaosi/test"); // CuratorDao.printChildNodes(zkClient, "/xiaosi");CuratorDao.checkNodeExists(zkClient, "/xiaosi");}}5.3.8 使用Curator高級(jí)API特性之Cache緩存監(jiān)控節(jié)點(diǎn)變化
cache是一種緩存機(jī)制,可以借助cache實(shí)現(xiàn)監(jiān)聽。
簡(jiǎn)單來說,cache在客戶端緩存了znode的各種狀態(tài),當(dāng)感知到zk集群的znode狀態(tài)變化,會(huì)觸發(fā)event事件,注冊(cè)的監(jiān)聽器會(huì)處理這些事件。
curator支持的cache種類有4種Path Cache,Node Cache,Tree Cache,Curator Cache
1)Path Cache
Path Cache用來觀察ZNode的子節(jié)點(diǎn)并緩存狀態(tài),如果ZNode的子節(jié)點(diǎn)被創(chuàng)建,更新或者刪除,那么Path Cache會(huì)更新緩存,并且觸發(fā)事件給注冊(cè)的監(jiān)聽器。
它是通過PathChildrenCache類來實(shí)現(xiàn)的,監(jiān)聽器注冊(cè)是通過PathChildrenCacheListener。
2)Node Cache
Node Cache用來觀察ZNode自身,如果ZNode節(jié)點(diǎn)本身被創(chuàng)建,更新或者刪除,那么Node Cache會(huì)更新緩存,并觸發(fā)事件給注冊(cè)的監(jiān)聽器。
它是通過NodeCache類來實(shí)現(xiàn)的,監(jiān)聽器對(duì)應(yīng)的接口為NodeCacheListener。
3)Tree Cache
Tree Cache是上兩種的合體,Tree Cache觀察的是自身+所有子節(jié)點(diǎn)的所有數(shù)據(jù),并緩存所有節(jié)點(diǎn)數(shù)據(jù)。
它是通過TreeCache類來實(shí)現(xiàn)的,監(jiān)聽器對(duì)應(yīng)的接口為TreeCacheListener。
4)Curator Cache ( requires ZooKeeper 3.6+)
Curator Cache,是在zk3.6新版本添加的特性,該版本的出現(xiàn)是為了逐步淘汰上面3監(jiān)聽。
它是通過CuratorCache類來實(shí)現(xiàn)的,監(jiān)聽器對(duì)應(yīng)的接口為CuratorCacheListener。
Curator一次性的watch
import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent;public class MyCuratorWatcher implements CuratorWatcher {@Overridepublic void process(WatchedEvent event) throws Exception {System.out.println("觸發(fā)watcher,節(jié)點(diǎn)路徑為:" + event.getPath());switch (event.getType()) {case NodeCreated:break;default:break;}} }//一次性的watchpublic static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}NodeCache監(jiān)聽當(dāng)前節(jié)點(diǎn)變化,通過NodeCacheListener接口持續(xù)監(jiān)聽節(jié)點(diǎn)的變化來實(shí)現(xiàn)
//持續(xù)監(jiān)聽的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception {final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把監(jiān)聽節(jié)點(diǎn),轉(zhuǎn)換為nodeCachenodeCache.start(false);//默認(rèn)為false 設(shè)置為true時(shí),會(huì)自動(dòng)把節(jié)點(diǎn)數(shù)據(jù)存放到nodeCache中;設(shè)置為false時(shí),初始化數(shù)據(jù)為空ChildData cacheData=nodeCache.getCurrentData(); if(null==cacheData) {System.out.println("NodeCache節(jié)點(diǎn)的初始化數(shù)據(jù)為空……");}else {System.out.println("NodeCache節(jié)點(diǎn)的初始化數(shù)據(jù)為"+new String(cacheData.getData()));}//設(shè)置循環(huán)監(jiān)聽nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata=nodeCache.getCurrentData();if(null==cdata) {System.out.println("節(jié)點(diǎn)發(fā)生了變化,可能剛剛被刪除!");nodeCache.close();//關(guān)閉監(jiān)聽}else {String data=new String(cdata.getData());String path=nodeCache.getCurrentData().getPath();System.out.println("節(jié)點(diǎn)路徑"+path+"數(shù)據(jù)發(fā)生了變化,最新數(shù)據(jù)為:"+data);}}});}PathChildrenCache只監(jiān)聽子節(jié)點(diǎn)變化
通過PathChildrenCacheListener接口持續(xù)監(jiān)聽子節(jié)點(diǎn)來實(shí)現(xiàn)
TreeCache是上兩者的合體,既監(jiān)聽自身,也監(jiān)聽所有子節(jié)點(diǎn)變化
通過TreeCacheListener接口來實(shí)現(xiàn)
Curator Cache,是在zk3.6新版本添加的特性,Curator需5.+
它的出現(xiàn)是為了替換以上3個(gè)監(jiān)聽(NodeCache、PathCache、TreeCache),它通過CuratorCacheListener.builder().for**來選擇對(duì)應(yīng)的監(jiān)聽。最后再通過curatorCache.listenable().addListener(listener);注冊(cè)監(jiān)聽。
5.4 使用Curator創(chuàng)建/驗(yàn)證ACL(訪問權(quán)限列表)
5.4.1 連通Zk時(shí),就指定登錄權(quán)限
//本類代碼,只涉及ACL操作 public class CuratorAcl {public CuratorFramework client = null;public static final String workspace="workspace";public static final String zkServerPath = "192.168.31.216:2181";public CuratorAcl() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情況下,登錄賬號(hào)、密碼可以通過構(gòu)造參數(shù)傳入,暫時(shí)固定,據(jù)需修改.connectString(zkServerPath).sessionTimeoutMs(20000).retryPolicy(retryPolicy).namespace(workspace).build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}} }5.4.2寫一個(gè)把明文的賬號(hào)密碼轉(zhuǎn)換為加密后的密文的工具類
//把明文的賬號(hào)密碼轉(zhuǎn)換為加密后的密文 public class AclUtils {public static String getDigestUserPwd(String loginId_Username_Passwd) {String digest = "";try {digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd);} catch (NoSuchAlgorithmException e) {e.printStackTrace();}return digest;}public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception {String id = "mayun:mayun";String idDigested = getDigestUserPwd(id);System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk=} }5.4.3使用自定義工具類AclUtils,一次性給多個(gè)用戶賦Acl權(quán)限
public static List<ACL> getAcls() throws NoSuchAlgorithmException{List<ACL> acls=new ArrayList<ACL>();Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));acls.add(new ACL(Perms.ALL, mayun));//給mayun一次性賦值所有權(quán)限acls.add(new ACL(Perms.READ, lilei));acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//給lilei分兩次賦權(quán)限(目的:看不同的賦權(quán)方式)return acls;}5.4.4級(jí)聯(lián)創(chuàng)建節(jié)點(diǎn),并賦予節(jié)點(diǎn)操作權(quán)限
public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {String result=cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true)//給節(jié)點(diǎn)賦權(quán)限.forPath(nodePath, nodeData.getBytes());System.out.println("創(chuàng)建成功,result="+result); }5.4.5讀取節(jié)點(diǎn)數(shù)據(jù)
public void getNodeData(CuratorAcl cto,String nodePath) throws Exception {Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);if(null!=stat) {System.out.println("節(jié)點(diǎn)" + nodePath + "的數(shù)據(jù)為: " + new String(data));System.out.println("該節(jié)點(diǎn)的版本號(hào)為: " + stat.getVersion());}}5.4.6修改具有ACL權(quán)限節(jié)點(diǎn)的data數(shù)據(jù)
public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("節(jié)點(diǎn)修改后的數(shù)據(jù)為:"+nodeNewData);cto.client.setData().forPath(nodePath, nodeNewData.getBytes());System.out.println("修改成功");}5.4.7兩種方法判斷node節(jié)點(diǎn)是否存(優(yōu)先使用第一種)
public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("-----------=================-------------");//判斷節(jié)點(diǎn)是否存在,方法一(路徑前面會(huì)自動(dòng)添加workspace)Stat stat=cto.client.checkExists().forPath(nodePath);System.out.println("======="+stat==null?"不存在":"存在");//判斷節(jié)點(diǎn)是否存在,方法二(路徑前面需手動(dòng)添加workspace)Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);System.out.println("======="+stat2==null?"不存在":"存在");}ACL權(quán)限的main方法測(cè)試
通過java代碼給某個(gè)節(jié)點(diǎn)添加ACL權(quán)限后,后臺(tái)登陸zk客戶端時(shí),是無法直接操作該節(jié)點(diǎn)被ACL控制的權(quán)限的操作的,要想操作具有ACL權(quán)限的節(jié)點(diǎn),方法只有兩個(gè)。
1、知道該節(jié)點(diǎn)輸入用戶都有哪些,用這些用戶的賬號(hào)密碼登錄
2、使用超級(jí)用戶登錄
#getAcl /succ/testDigest 查看都有哪些用戶對(duì)該節(jié)點(diǎn)有操作權(quán)限
#addauth digest succ:succ 登錄
7 分布式鎖
Curator的5種分布式鎖及其對(duì)應(yīng)的核心類:
1.重入式排它鎖 Shared Reentrant Lock,實(shí)現(xiàn)類:InterProcessMutex
2.不可重入排它鎖 Shared Lock ,實(shí)現(xiàn)類:InterProcessSemaphoreMutex
3.可重入讀寫鎖 Shared Reentrant Read Write Lock,實(shí)現(xiàn)類: InterProcessReadWriteLock 、InterProcessLock
4.多鎖對(duì)象容器(多共享鎖) Multi Shared Lock,將多個(gè)鎖作為單個(gè)實(shí)體管理的容器,實(shí)現(xiàn)類:InterProcessMultiLock、InterProcessLock
5.共享信號(hào)鎖Shared Semaphore ,實(shí)現(xiàn)類:InterProcessSemaphoreV2
跨 JVM 工作的計(jì)數(shù)信號(hào)量。使用相同鎖路徑的所有 JVM 中的所有進(jìn)程將實(shí)現(xiàn)進(jìn)程間有限的租用集。此外,這個(gè)信號(hào)量大多是“公平的”——每個(gè)用戶將按照請(qǐng)求的順序獲得租用(從 ZK 的角度來看)。
有兩種模式可用于確定信號(hào)量的最大租用。在第一種模式中,最大租用是由給定路徑的用戶維護(hù)的約定。在第二種模式中,SharedCountReader 用作給定路徑的信號(hào)量的方法,以確定最大租用。
7.1.重入式排它鎖InterProcessMutex
public InterProcessMutex(CuratorFramework client, String path)
獲取/釋放鎖的API
public void acquire() throws Exception;//獲取鎖,獲取不到鎖一直阻塞,zk連接中斷則拋異常
public boolean acquire(long time, TimeUnit unit) throws Exception;//獲取鎖,超過該時(shí)間后,直接返回false,zk連接中斷則拋異常
public void release() throws Exception;//釋放鎖
通過release()方法釋放鎖。InterProcessMutex 實(shí)例可以重用。Revoking ZooKeeper recipes wiki定義了可協(xié)商的撤銷機(jī)制。為了撤銷mutex, 調(diào)用下面的方法
/**
- 將鎖設(shè)為可撤銷的. 當(dāng)別的進(jìn)程或線程想讓你釋放鎖時(shí)Listener會(huì)被調(diào)用。
- Parameters:
- listener - the listener
*/
public void makeRevocable(RevocationListener listener)
7.2.不可重入排它鎖InterProcessSemaphoreMutex
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
使用InterProcessSemaphoreMutex,調(diào)用方法類似,區(qū)別在于該鎖是不可重入的,在同一個(gè)線程中不可重入
7.3.可重入讀寫鎖InterProcessReadWriteLock 、InterProcessLock
一個(gè)讀寫鎖管理一對(duì)相關(guān)的鎖。一個(gè)負(fù)責(zé)讀操作,另外一個(gè)負(fù)責(zé)寫操作。讀操作在寫鎖沒被使用時(shí)可同時(shí)由多個(gè)進(jìn)程使用,而寫鎖使用時(shí)不允許讀 (阻塞)。此鎖是可重入的。一個(gè)擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進(jìn)入寫鎖。這也意味著寫鎖可以降級(jí)成讀鎖, 比如請(qǐng)求寫鎖 —>讀鎖 —->釋放寫鎖。從讀鎖升級(jí)成寫鎖是不成的。
7.4.多鎖對(duì)象容器(多共享鎖) ,將多個(gè)鎖作為單個(gè)實(shí)體管理,InterProcessMultiLock、InterProcessLock
Multi Shared Lock是一個(gè)鎖的容器。當(dāng)調(diào)用acquire, 所有的鎖都會(huì)被acquire(上鎖),如果請(qǐng)求失敗,所有的鎖都會(huì)被release (釋放鎖)。同樣調(diào)用release時(shí)所有的鎖都被release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請(qǐng)求釋放操作都會(huì)傳遞給它包含的所有的鎖。主要涉及兩個(gè)類:InterProcessMultiLock、InterProcessLock
它的構(gòu)造函數(shù)需要包含的鎖的集合,或者一組ZooKeeper的path。
public InterProcessMultiLock(List locks)
public InterProcessMultiLock(CuratorFramework client, List paths)
7.5.代碼
public class ZkLock {final static Logger log = LoggerFactory.getLogger(ZkLock.class);public CuratorFramework zkClient = null; // zk的客戶端工具Curator(在本類通過new實(shí)例化的是,自動(dòng)start)private static final int BASE_SLEEP_TIME_MS = 1000; // 連接失敗后,再次重試的間隔時(shí)間 單位:毫秒private static final int MAX_RETRY_TIMES = 10; // 定義失敗重試次數(shù)private static final int SESSION_TIME_OUT = 1000000; // 會(huì)話存活時(shí)間,根據(jù)業(yè)務(wù)靈活指定 單位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";// Zookeeper服務(wù)所在的IP和客戶端端口private static final String NAMESPACE = "workspace";// 指定后,默認(rèn)操作的所有的節(jié)點(diǎn)都會(huì)在該工作空間下進(jìn)行static int j = 10;//初始化zk客戶端public ZkLock() {// 重試策略:初試時(shí)間為1s 重試10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES);// 通過工廠建立連接zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 連接地址.sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重試策略.build();zkClient.start();}public static void lockTest(CuratorFramework zkClient) throws InterruptedException {// 使用分布式鎖,所有系統(tǒng)同時(shí)監(jiān)聽同一個(gè)節(jié)點(diǎn),達(dá)到分布式鎖的目的final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test");final CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {//啟動(dòng)10個(gè)線程new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();// 線程等待一起執(zhí)行l(wèi)ock.acquire();// 分布式鎖,數(shù)據(jù)同步// 處理業(yè)務(wù)j--;System.out.println(j);} catch (Exception e) {e.printStackTrace();} finally {try {// 釋放鎖lock.release();} catch (Exception e) {e.printStackTrace();}}}}, "t" + i).start();}Thread.sleep(1000);countDownLatch.countDown();// 模擬十個(gè)線程一起并發(fā).指定一起執(zhí)行}public static void main(String[] args) throws InterruptedException {ZkLock zkl = new ZkLock();ZkLock.lockTest(zkl.zkClient);} }8.分布式計(jì)數(shù)器
利用Zookeeper可以實(shí)現(xiàn)一個(gè)集群共享的計(jì)數(shù)器。只要使用相同的path就可以得到最新的計(jì)數(shù)器值, 這是由ZooKeeper的一致性保證的。Curator有兩個(gè)計(jì)數(shù)器:DistributedAtomicInteger,DistributedAtomicLong。這個(gè)兩個(gè)除了計(jì)數(shù)范圍(int、long)不同外,沒有任何不同。操作也非常簡(jiǎn)單,跟AtomicInteger大同小異。
increment() //加1
decrement() //減1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //獲取當(dāng)前值
add():增加特定的值
subtract(): 減去特定的值
trySet(): 嘗試設(shè)置計(jì)數(shù)值
使用的時(shí)候,必須檢查返回結(jié)果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
另外Curator還有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式隊(duì)列DistributedQueueDistributed Queue
https://blog.csdn.net/succing/article/details/121779721
https://blog.csdn.net/succing/article/details/121793494
https://blog.csdn.net/succing/article/details/121844550
https://blog.csdn.net/succing/article/details/121802687
總結(jié)
以上是生活随笔為你收集整理的大数据之Zookeeper的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: deepTools对ChIP-seq数据
- 下一篇: Pascal voc 2012 数据集简