超全zookeeper知识点与实战
第1章 Zookeeper
1.1 概述
Zookeeper是一個開源的分布式的,為分布式應用提供協調服務的Apache項目。
配合其他服務器,
文件系統——存儲各種服務器上線信息
通知機制——客戶端跟zookeeper打招呼
1.2 特點
1)Zookeeper:一個領導者(Leader),多個跟隨者(Follower)組成的集群。
2)集群中只要有半數以上節點存活,Zookeeper集群就能正常服務。所以Zookeeper適合安裝奇數臺服務器。 (如果偶數就浪費了一臺集群)
3)全局數據一致:每個Server保存一份相同的數據副本,Client無論連接到哪個Server,數據都是一致的。
4)更新請求順序執行,來自同一個Client的更新請求按其發送順序依次執行。
5)數據更新原子性,一次數據更新要么成功,要么失敗。
(封裝成一個大的事務,要么整體成功,要么整體失敗)
6)實時性,在一定時間范圍內,Client能讀到最新數據。
(同步數據,速度非常快,因為zookeeper里面的數據非常小)
1.3 數據結構
數據結構
ZooKeeper數據模型的結構與Unix文件系統很類似,整體上可以看作是一棵樹,每個節點稱做一個ZNode。每一個ZNode默認能夠存儲1MB的數據,每個ZNode都可以通過其路徑唯一標識。
存儲的數據量比較小,存儲的內容有限
1.4 應用場景
提供的服務包括:統一命名服務、統一配置管理、統一集群管理、服務器節點動態上下線、軟負載均衡等。
統一命名服務
在分布式環境下,經常需要對應用/服務進行統一命名,便于識別。
例如:IP不容易記住,而域名容易記住。
統一配置管理
1)分布式環境下,配置文件同步非常常見。
(1)一般要求一個集群中,所有節點的配置信息是一致的,比如 Kafka 集群。
(2)對配置文件修改后,希望能夠快速同步到各個節點上。
2) 配置管理可交由ZooKeeper實現。
(1)可將配置信息寫入ZooKeeper上的一個Znode。
(2)各個客戶端服務器監聽這個Znode。
(3)一旦Znode中的數據被修改,ZooKeeper將通知各個客戶端服務器。
1)分布式環境中,實時掌握每個節點的狀態是必要的。
(1)可根據節點實時狀態做出一些調整。
2)ZooKeeper可以實現實時監控節點狀態變化
(1)可將節點運行狀態信息寫入ZooKeeper上的一個ZNode。
(2)監聽這個ZNode可獲取它的實時狀態變化。
相當于講客戶端所有的相關信息注冊上之后,進行統一的集群管理,以及集群狀態的好壞
服務器動態上下線
軟負載均衡
在Zookeeper中記錄每臺服務器的訪問數,讓訪問數最少的服務器去處理最新的客戶端請求
服務器注冊域名,下面有多臺服務器,多臺服務器有一個接客線程數,zookeeper根據每一個節點上對應的訪問數來進行軟負載均衡
1.5 下載地址
1)官網首頁:
https://zookeeper.apache.org/
2)下載截圖
選擇相對穩定的老版本
第2章 Zookeeper本地安裝
2.1 本地模式安裝部署
1)安裝前準備
(1)安裝Jdk
(2)拷貝Zookeeper安裝包到Linux系統下
先啟動三臺集群
(3)解壓到指定目錄
[leokadia@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
查看
改名
2)配置修改
(1)將/opt/module/zookeeper-3.5.7/conf這個路徑下的zoo_sample.cfg修改為zoo.cfg;
(2)打開zoo.cfg文件,修改dataDir路徑:
[leokadia@hadoop102 conf]$ vim zoo.cfg/tmp存儲的是臨時數據,到了一個月會被刪除掉,也就是到了1個月zookeeper數據都沒了,于是需要創建一個目錄進行保存,通常創建zkData
想把數據放在自己的框架下,于是在自己的目錄下創建目錄zkData
(3)在/opt/module/zookeeper-3.5.7/這個目錄上創建zkData文件夾
[leokadia@hadoop102 zookeeper-3.5.7]$ mkdir zkData
在配置文件中修改如下內容:
3)操作Zookeeper
(1)啟動Zookeeper
(2)查看進程是否啟動
[leokadia@hadoop102 zookeeper-3.5.7]$ jps(3)查看狀態:
[leokadia@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status!
(4)啟動客戶端:
(5)退出客戶端:
[zk: localhost:2181(CONNECTED) 0] quit(6)停止Zookeeper
[leokadia@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh stop2.2 配置參數解讀
Zookeeper中的配置文件zoo.cfg中參數含義解讀如下:
1)tickTime =2000:通信心跳數,Zookeeper服務器與客戶端心跳時間,單位毫秒
Zookeeper使用的基本時間,服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個tickTime時間就會發送一個心跳,時間單位為毫秒。
它用于心跳機制,并且設置最小的session超時時間為兩倍心跳時間。(session的最小超時時間是2*tickTime)
即客戶端與服務端,服務端與服務段2s中發一次通信
2)initLimit =10:LF初始通信時限
集群中的Follower跟隨者服務器與Leader領導者服務器之間初始連接時能容忍的最多心跳數(tickTime的數量),用它來限定集群中的Zookeeper服務器連接到Leader的時限。
初始化在10個心跳(20s)還沒有建立連接,這個通信就是失敗的
3)syncLimit =5:LF同步通信時限
集群中Leader與Follower之間的最大響應時間單位,假如響應超過syncLimit * tickTime,Leader認為Follwer死掉,從服務器列表中刪除Follwer。
4)dataDir:數據文件目錄+數據持久化路徑
主要用于保存Zookeeper中的數據。
注意:默認的tmp目錄,容易被Linux系統定期刪除,所以一般不用默認的tmp目錄。
5)clientPort =2181:客戶端連接端口
監聽客戶端連接的端口。通常情況下不做修改。
第3章 Zookeeper實戰(開發重點)
3.1 分布式安裝部署
1)集群規劃
在hadoop102、hadoop103和hadoop104三個節點上部署Zookeeper。
2)解壓安裝
(1)解壓Zookeeper安裝包到/opt/module/目錄下
之前本地做過了
(2)同步/opt/module/zookeeper-3.5.7目錄內容到hadoop103、hadoop104
[leokadia@hadoop102 module]$ xsync zookeeper-3.5.7/3)配置服務器編號
(1)在/opt/module/zookeeper-3.5.7/這個目錄下創建zkData
(2)在/opt/module/zookeeper-3.5.7/zkData目錄下創建一個myid的文件
[leokadia@hadoop102 zkData]$ touch myid添加myid文件,注意一定要在linux里面創建,在notepad++里面很可能亂碼
(3)編輯myid文件
在文件中添加與server對應的編號:
2
Hadoop102 配置2,hadoop103配置3,hadoop104配置4
(4)拷貝配置好的zookeeper到其他機器上
[leokadia@hadoop102 zkData]$ xsync myid如果前面沒分發,同步/opt/module/zookeeper-3.5.7目錄內容到hadoop103、hadoop104,當然也可以等到后面一起分發
[leokadia@hadoop102 module]$ xsync zookeeper-3.5.7/并分別在hadoop103、hadoop104上修改myid文件中內容為3、4
4)配置zoo.cfg文件
(1)重命名/opt/module/zookeeper-3.5.7/conf這個目錄下的zoo_sample.cfg為zoo.cfg
(2)打開zoo.cfg文件
[leokadia@hadoop102 conf]$ vim zoo.cfg修改數據存儲路徑配置(在上文已經做過了)
dataDir=/opt/module/zookeeper-3.5.7/zkData
增加如下配置
(3)同步zoo.cfg配置文件
[leokadia@hadoop102 conf]$ xsync zoo.cfg
檢查是否分發成功
hadoop103修改成功
(4)配置參數解讀
server.A=B:C:D。
A是一個數字,表示這個是第幾號服務器;
集群模式下配置一個文件myid,這個文件在dataDir目錄下,這個文件里面有一個數據就是A的值,Zookeeper啟動時讀取此文件,拿到里面的數據與zoo.cfg里面的配置信息比較從而判斷到底是哪個server。
B是這個服務器的地址;
C是這個服務器Follower與集群中的Leader服務器交換信息的端口;
D是萬一集群中的Leader服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。
5)集群操作
(1)分別啟動Zookeeper
注:如果只啟動一臺集群查看狀態,發現沒有啟動起來,因為此時是3臺服務器,目前只啟動一臺服務器,沒有達到超過半數,就不會選出對應的Leader,對應的集群就沒法工作,集群必須要有超過半數以上的服務器是好的,才能正常工作
如何知道是否超過半數了呢?根據配置文件剛剛配置文件中寫了有3臺服務器
此時我們啟動第二臺服務器,可以發現hadoop103成為了leader
再查看hadoop102,發現其成為了follower
此時再啟動hadoop104,查看狀態
發現104為follower
(2)查看狀態(見上圖)
[leokadia@hadoop102 zookeeper-3.5.7]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: follower [leokadia@hadoop103 zookeeper-3.5.7]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: leader [leokadia@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: follower3.1.2 集群啟動停止腳本
由于如果集群較多的話,每一臺集群啟動和停止都需要輸入命令太過于繁瑣,故編寫腳本統一啟動和停止。
1)在 hadoop102 的/home/leokadia/bin 目錄下創建腳本
在腳本中編寫如下內容
#!/bin/bash case $1 in "start"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 啟動 ------------ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"done };; "stop"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 停止 ------------ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"done };; "status"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 狀態 ------------ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"done };; esac2)增加腳本執行權限
[leokadia@hadoop102 bin]$ chmod 777 zk.sh3)Zookeeper 集群啟動腳本
先啟動一下hadoop集群
啟動zk集群
[leokadia@hadoop102 bin]$ zk.sh start
查看各集群狀態:
4)Zookeeper 集群停止腳本
[leokadia@hadoop102 bin]$ zk.sh stop3.2 客戶端命令行操作
命令基本語法 功能描述
help 顯示所有操作命令
ls path 使用 ls 命令來查看當前znode的子節點
-w 監聽子節點變化
-s 附加次級信息
create 普通創建
-s 含有序列
-e 臨時(重啟或者超時消失)
get path 獲得節點的值
-w 監聽節點內容變化
-s 附加次級信息
set 設置節點的具體值
stat 查看節點狀態
delete 刪除節點
deleteall 遞歸刪除節點
先啟動集群
1)啟動客戶端
[leokadia@hadoop103 zookeeper-3.5.7]$ bin/zkCli.sh
啟動之后發現是個本地的客戶端,想要把它變成hadoop102或者103這種的客戶端,先輸入quit退出,再輸入
2)顯示所有操作命令
[zk: localhost:2181(CONNECTED) 1] help3)查看當前znode中所包含的內容
[zk: localhost:2181(CONNECTED) 0] ls / [zookeeper]4)查看當前節點詳細數據
[zk: localhost:2181(CONNECTED) 1] ls2 / [zookeeper] cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1
(1)czxid:創建節點的事務 zxid
每次修改 ZooKeeper 狀態都會產生一個 ZooKeeper 事務 ID。事務 ID 是 ZooKeeper 中所 有修改總的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之 前發生。
(2)ctime:znode 被創建的毫秒數(從 1970 年開始)
(3)mzxid:znode 最后更新的事務 zxid
(4)mtime:znode 最后修改的毫秒數(從 1970 年開始)
(5)pZxid:znode 最后更新的子節點 zxid (存儲的是樹形結構)
(6)cversion:znode 子節點變化號,znode 子節點修改次數
(7)dataversion:znode 數據變化號
(8)aclVersion:znode 訪問控制列表的變化號
(9)ephemeralOwner:如果是臨時節點,這個是 znode 擁有者的 session id。如果不是 臨時節點則是 0。
(10)dataLength:znode 的數據長度
(11)numChildren:znode 子節點數量
1)分別創建2個普通節點(永久節點)還是用的漫威的例子
[zk: hadoop102:2181(CONNECTED) 7] create /hero "ironman" Created /hero [zk: hadoop102:2181(CONNECTED) 9] create /hero/Marvel "Hulk" Created /hero/Marvel2)獲得節點的值
[zk: hadoop102:2181(CONNECTED) 12] get -s /hero ironman cZxid = 0x300000006 ctime = Sat Nov 27 08:07:39 CST 2021 mZxid = 0x300000006 mtime = Sat Nov 27 08:07:39 CST 2021 pZxid = 0x300000007 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 7 numChildren = 1 [zk: hadoop102:2181(CONNECTED) 13] get -s /hero/Marvel Hulk cZxid = 0x300000007 ctime = Sat Nov 27 08:10:48 CST 2021 mZxid = 0x300000007 mtime = Sat Nov 27 08:10:48 CST 2021 pZxid = 0x300000007 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 0
3)創建帶序號的節點(永久節點 + 帶序號)
那么帶序號和不帶序號的節點有什么區別嗎?
帶序號的節點可以創建相同名稱的節點,序號自動加1,不帶序號的不能重復創建同名結點。
驗證退出客戶端,節點是否還是存在
節點沒有被刪除
如果原來沒有序號節點,序號從 0 開始依次遞增。如果原節點下已有 2 個節點,則再排 序時從 2 開始,以此類推。
退出后,發現臨時節點消失
9)修改節點數據值
[zk: hadoop102:2181(CONNECTED) 3] set -s /hero/DC "DC"3.2.4 監聽器原理
客戶端注冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、節點刪除、子目 錄節點增加刪除)時,ZooKeeper 會通知客戶端。監聽機制保證 ZooKeeper 保存的任何的數 據的任何改變都能快速的響應到監聽了該節點的應用程序。
1、監聽原理詳解
- 1)首先在要有一個main()線程
- 2)在main線程中創建Zookeeper客戶端,這時就會創建兩個線程,一個負責網絡連接通信(connet),一個負責監聽(listener)。
- 3)通過connect線程將注冊的監聽事件發送給Zookeeper。
- 4)在Zookeeper的注冊監聽器列表中將注冊的監聽事件添加到列表中。
- 5)Zookeeper監聽到有數據或路徑變化,就會將這個消息發送給listener線程。
- 6)listener線程內部調用了process()方法。
2、常見的監聽
1)監聽節點數據的變化
get path [watch]
2)監聽子節點增減的變化
ls path [watch]
10)節點的值變化監聽
(1)在hadoop104主機上注冊監聽/hero節點數據變化
(2)在hadoop103主機上修改/sanguo節點的數據
[zk: localhost:2181(CONNECTED) 0] set /hero "Avengers"(3)觀察hadoop104主機收到數據變化的監聽
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/hero
再次修改數據,hadoop104無變化,監聽不到
注意:在hadoop103再多次修改/hero的值,hadoop104上不會再收到監聽。因為注冊 一次,只能監聽一次。想再次監聽,需要再次注冊。
2)節點的子節點變化監聽(路徑變化)
(1)在hadoop104主機上注冊監聽/sanguo節點的子節點變化
(2)在hadoop103主機/sanguo節點上創建子節點
[zk: localhost:2181(CONNECTED) 0] create /hero/Disney "Woody" Created /hero/Disney(3)觀察hadoop104主機收到子節點變化的監聽
WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hero
同理,再在hadoop103創建子節點,hadoop104上不會再收到監聽。因為注冊一次,只能監聽一次。想再次監聽,需要再次注冊。
注意:節點的路徑變化,也是注冊一次,生效一次。想多次生效,就需要多次注冊。
3.2.5 節點刪除與查找
12)刪除節點
[zk: localhost:2181(CONNECTED) 7] delete /hero/Disney2)遞歸刪除節點
[zk: localhost:2181(CONNECTED) 15] deleteall/hero3)查看節點狀態
[zk: localhost:2181(CONNECTED) 17] stat /zookeeper3.3 客戶端API應用
3.3.1 IDEA環境搭建
1)創建一個Maven工程
2)添加pom文件
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version></dependency> </dependencies>
3)拷貝log4j.properties文件到項目根目錄
需要在項目的src/main/resources目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。
3.3.2 創建ZooKeeper客戶端
public class zkClient {// 注意:逗號左右不能有空格private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout = 2000;private ZooKeeper zkClient;@Beforepublic void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}}3.3.3 創建子節點
@Testpublic void create() throws KeeperException, InterruptedException {String nodeCreated = zkClient.create("/leokadia", "sa.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//創建持久節點}
在hadoop103上驗證是否創建對應節點,創建成功
3.3.4 獲取子節點并監聽節點變化
@Test public void getChildren() throws Exception {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}// 延時阻塞Thread.sleep(Long.MAX_VALUE); }
成功打印節點
在集群中創建節點,希望在控制臺上有變化所以需要加一個延時
但由于這里只注冊監聽一次,就失效了,故將其移到上面代碼中
在控制臺創建節點
成功監聽
刪除節點也能順利監聽
3.3.5 判斷Znode是否存在
@Test public void exist() throws Exception {Stat stat = zkClient.exists("/eclipse", false);System.out.println(stat == null ? "not exist" : "exist"); }
在集群中將其刪除
再次運行代碼,不存在
3.4 客戶端向服務端寫數據流程
3.4.1 寫流程之寫入請求直接發送給Leader節點
寫請求給leader,leader將寫請求通知follower,follower寫完之后回復leader以ack,當超過半數的寫完了,就回復客戶端寫完了,其余沒寫完的繼續寫
3.4.2 寫流程之寫入請求發送給follower節點
follower沒有寫權限,將寫請求送給有權限的Leader,Leader先自己寫一份,寫完后,傳達寫命令給其他的Follower節點,讓他們寫,當超過半數的寫完了,Leader就回復最先收到請求的Follower,整個集群數據寫完了,Follower回復客戶端寫完了,其余沒寫完的小半數Follower繼續寫
4 監聽服務器節點動態上下線案例
1)需求
某分布式系統中,主節點可以有多臺,可以動態上下線,任意一臺客戶端都能實時感知到主節點服務器的上下線。
2)需求分析
第一臺服務器上線,在zookeeper上創建對應的節點servers/server1 hadoop101 80 nodes,在節點上提供的信息為,該節點主機的名稱為hadoop101,當前已經連接了80個客戶端(即為該節點存儲的數據)
第二臺服務器上線,在zookeeper上創建對應的節點servers/server2 hadoop102 90 nodes,在節點上提供的信息為,該節點主機的名稱為hadoop102,當前已經連接了90個客戶端(即為該節點存儲的數據)
第三臺服務器上線,在zookeeper上創建對應的節點servers/server3 hadoop103 95 nodes,在節點上提供的信息為,該節點主機的名稱為hadoop103,當前已經連接了95個客戶端(即為該節點存儲的數據)
總結:上線服務器就是在zookeeper集群創建服務器節點的操作
回顧之前API操作,上面服務器的操作是create操作,下面是-w監聽操作
3)具體實現
先將集群之前測試用的多于的節點都刪除,只留zookeeper節點
(0)先在集群上創建/servers節點
[zk: localhost:2181(CONNECTED) 11] create /servers “servers”
Created /servers
(1)服務器端向Zookeeper注冊代碼
package com.leokadia.case1;import org.apache.zookeeper.*;import java.io.IOException;public class DistributeServer {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout = 2000;private ZooKeeper zk;public static void main(String[] args) throws IOException, KeeperException, InterruptedException {DistributeServer server = new DistributeServer();// 1 獲取zk連接server.getConnect();// 2 注冊服務器到zk集群server.regist(args[0]);// 3 啟動業務邏輯(等待)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}// 2 注冊服務器到zk集群,創建對應的路徑,在節點上輸入對應主機名稱private void regist(String hostname) throws KeeperException, InterruptedException {String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//路徑 主機名稱 權限,上線就有,沒上線就沒有,且有對應的編號,因此創建臨時帶編號節點-e-sSystem.out.println(hostname +" is online") ;}private void getConnect() throws IOException {//連接器連接上對應的zookeeperzk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});} }(2)客戶端代碼
package com.leokadia.case1;import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.ArrayList; import java.util.List;public class DistributeClient {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout = 2000;private ZooKeeper zk;public static void main(String[] args) throws IOException, KeeperException, InterruptedException {DistributeClient client = new DistributeClient();// 1 獲取zk連接client.getConnect();// 2 監聽/servers下面子節點的增加和刪除client.getServerList();// 3 業務邏輯(睡覺)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws KeeperException, InterruptedException {List<String> children = zk.getChildren("/servers", true);ArrayList<String> servers = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}// 打印System.out.println(servers);}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});} }4.4測試
1)在 Linux 命令行上操作增加減少服務器
(1)啟動 DistributeClient 客戶端
(2)在hadoop102上zk的客戶端/servers 目錄上創建臨時帶序號節點
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102" [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"(3)觀察 Idea 控制臺變化 [hadoop102, hadoop103]
(4)執行刪除操作
[zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000(5)觀察 Idea 控制臺變化 [hadoop103]
2)在 Idea 上操作增加減少服務器
(1)啟動 DistributeClient 客戶端(如果已經啟動過,不需要重啟)
(2)啟動 DistributeServer 服務端
①點擊 Edit Configurations…
②在彈出的窗口中(Program arguments)輸入想啟動的主機,例如,hadoop102
運行server
查看客戶端,檢測到剛剛下線的hadoop102上線了
更換參數hadoop104
客戶端檢測
5. 分布式鎖的概念
5.1 分布式鎖概念
分布式鎖:
"進程 1"在使用該資源的時候,會先去獲得鎖,"進程 1"獲得鎖以后會對該資源保持獨占,這樣其他進程就無法訪問該資源,"進程 1"用完該資源以后就將鎖釋放掉,讓其他進程來獲得鎖,那么通過這個鎖機制,我們就能保證了分布式系統中多個進程能夠有序的訪問該臨界資源。那么我們把這個分布式環境下的這個鎖叫作分布式鎖。
1)接收到請求后,在/locks節點下創建一個臨時順序節點
客戶端訪問集群,創建臨時帶序號的節點,有n多個客戶端,在這個目錄下創建自己的節點
2)判斷自己是不是當前節點下最小的節點:是,獲取到鎖;不是,對前一個節點進行監聽
3)獲取到鎖,處理完業務后,delete節點釋放鎖,然后下面的節點將收到通知,重復第二步判斷
客戶端在目錄下創建臨時帶序號的節點,序號小的節點優先拿到鎖進行相關業務的操作,其他節點如果發現自己不是序號最小的節點就監聽前一個節點,前一個節點如果釋放,則立即獲得這把鎖
5.2 原生 Zookeeper 實現分布式鎖案例
1)分布式鎖實現
package com.leokadia.case2;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class DistributedLock {private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private final int sessionTimeout = 2000;private final ZooKeeper zk;private CountDownLatch connectLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);private String waitPath;private String currentMode;public DistributedLock() throws IOException, InterruptedException, KeeperException {// 獲取連接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果連接上zk 可以釋放if (watchedEvent.getState() == Event.KeeperState.SyncConnected){connectLatch.countDown();}// waitLatch 需要釋放if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}});// 等待zk正常連接后,往下走程序connectLatch.await();// 判斷根節點/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 創建一下根節點zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 對zk加鎖 判斷是否是序號最小public void zklock() {// 創建對應的臨時帶序號節點try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// wait一小會, 讓結果更清晰一些Thread.sleep(10);// 判斷創建的節點是否是最小的序號節點,如果是獲取到鎖;如果不是,監聽他序號前一個節點List<String> children = zk.getChildren("/locks", false);// 如果children 只有一個值,那就直接獲取鎖; 如果有多個節點,需要判斷,誰最小if (children.size() == 1) {return;} else {Collections.sort(children);// 獲取節點名稱 seq-00000000String thisNode = currentMode.substring("/locks/".length());// 通過seq-00000000獲取該節點在children集合的位置int index = children.indexOf(thisNode);// 判斷if (index == -1) {System.out.println("數據異常");} else if (index == 0) {// 就一個節點,可以獲取鎖了return;} else {// 需要監聽 他前一個節點變化waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath,true,new Stat());// 等待監聽waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}// 解鎖public void unZkLock() {// 刪除節點try {zk.delete(this.currentMode,-1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}} }2)分布式鎖測試
(1)創建兩個線程
(2)觀察控制臺變化:
也有可能線程2先獲取到鎖
5.3 Curator 框架實現分布式鎖案例
1)原生的 Java API 開發存在的問題
(1)會話連接是異步的,需要自己去處理。比如使用 CountDownLatch
(2)Watch 需要重復注冊,不然就不能生效
(3)開發的復雜性還是比較高的
(4)不支持多節點刪除和創建。需要自己去遞歸
2)Curator 是一個專門解決分布式鎖的框架,解決了原生 JavaAPI 開發分布式遇到的問題。 詳情請查看官方文檔:https://curator.apache.org/index.html
3)Curator 案例實操
(1)添加依賴
(2)代碼實現
package com.leokadia.case3;import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 創建分布式鎖1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 創建分布式鎖2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("線程1 獲取到鎖");lock1.acquire();System.out.println("線程1 再次獲取到鎖");Thread.sleep(5 * 1000);lock1.release();System.out.println("線程1 釋放鎖");lock1.release();System.out.println("線程1 再次釋放鎖");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("線程2 獲取到鎖");lock2.acquire();System.out.println("線程2 再次獲取到鎖");Thread.sleep(5 * 1000);lock2.release();System.out.println("線程2 釋放鎖");lock2.release();System.out.println("線程2 再次釋放鎖");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();// 啟動客戶端client.start();System.out.println("zookeeper 啟動成功");return client;} }(2)觀察控制臺變化:
線程 1 獲取鎖
線程 1 再次獲取鎖
線程 1 釋放鎖
線程 1 再次釋放鎖
線程 2 獲取鎖
線程 2 再次獲取鎖
線程 2 釋放鎖
線程 2 再次釋放鎖
第4章 Zookeeper內部原理
這一部分對上述曾經提到過的內部原理進行總結,并闡述Zookeeper的一些算法基礎
6.1 節點類型
持久(Persistent):客戶端和服務器端斷開連接后,創建的節點不刪除
短暫(Ephemeral):客戶端和服務器端斷開連接后,創建的節點自己刪除
(1)持久化目錄節點:客戶端與Zookeeper斷開連接后,該節點依舊存在
(2)持久化順序編號目錄節點:客戶端與Zookeeper斷開連接后,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號
(3)臨時目錄節點:客戶端與Zookeeper斷開連接后,該節點被刪除
(4)臨時順序編號目錄節點:客戶端與 Zookeeper 斷開連接后,該節點被刪除,只是 Zookeeper給該節點名稱進行順序編號。
說明:創建znode時設置順序標識,znode名稱后會附加一個值,順序號是一個單調遞增的計數器,由父節點維護
注意:在分布式系統中,順序號可以被用于為所有的事件進行全局排序,這樣客戶端可以通過順序號推斷事件的順序
6.2 Stat結構體(節點的詳細數據)
(1)czxid-創建節點的事務zxid
每次修改ZooKeeper狀態都會收到一個zxid形式的時間戳,也就是ZooKeeper事務ID。
事務ID是ZooKeeper中所有修改總的次序。每個修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前發生。
(2)ctime - znode被創建的毫秒數(從1970年開始)
(3)mzxid - znode最后更新的事務zxid
(4)mtime - znode最后修改的毫秒數(從1970年開始)
(5)pZxid-znode最后更新的子節點zxid
(6)cversion - znode子節點變化號,znode子節點修改次數
(7)dataversion - znode數據變化號
(8)aclVersion - znode訪問控制列表的變化號
(9)ephemeralOwner- 如果是臨時節點,這個是znode擁有者的session id。如果不是臨時節點則是0。
(10)dataLength- znode的數據長度
(11)numChildren - znode子節點數量
6.3 監聽器原理
客戶端注冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、節點刪除、子目 錄節點增加刪除)時,ZooKeeper 會通知客戶端。監聽機制保證 ZooKeeper 保存的任何的數 據的任何改變都能快速的響應到監聽了該節點的應用程序。
1、監聽原理詳解
1)首先在要有一個main()線程
2)在main線程中創建Zookeeper客戶端,這時就會創建兩個線程,一個負責網絡連接通信(connet),一個負責監聽(listener)。
3)通過connect線程將注冊的監聽事件發送給Zookeeper。
4)在Zookeeper的注冊監聽器列表中將注冊的監聽事件添加到列表中。
5)Zookeeper監聽到有數據或路徑變化,就會將這個消息發送給listener線程。
6)listener線程內部調用了process()方法。
2、常見的監聽
1)監聽節點數據的變化
get path [watch]
2)監聽子節點增減的變化
ls path [watch]
6.4 選舉機制
(1)半數機制:集群中半數以上機器存活,集群可用。所以Zookeeper適合安裝奇數臺服務器。
(2)Zookeeper雖然在配置文件中并沒有指定Master和Slave。但是,Zookeeper工作時,是有一個節點為Leader,其他則為Follower,Leader是通過內部的選舉機制臨時產生的。
(3)以一個簡單的例子來說明整個選舉的過程。
假設有五臺服務器組成的Zookeeper集群,它們的id從1-5,同時它們都是最新啟動的,也就是沒有歷史數據,在存放數據量這一點上,都是一樣的。假設這些服務器依序啟動,來看看會發生什么。
Zookeeper的選舉機制
6.4.1 第一次啟動
(1)服務器1啟動,發起一次選舉。服務器1投自己一票。此時服務器1票數一票,不夠半數以上(3票),選舉無法完成,服務器1狀態保持為LOOKING;
(2)服務器2啟動,再發起一次選舉。服務器1和2分別投自己一票并交換選票信息:此時服務器1發現服務器2的myid比自己目前投票推舉的(服務器1)大,更改選票為推舉服務器2。此時服務器1票數0票,服務器2票數2票,沒有半數以上結果,選舉無法完成,服務器1,2狀態保持LOOKING
(3)服務器3啟動,發起一次選舉。先投自己一票,此時服務器1和2發現服務器3的myid比自己目前投票推舉的(服務器2)大,都會更改選票為服務器3。此次投票結果:服務器1為0票,服務器2為0票,服務器3為3票。此時服務器3的票數已經超過半數,服務器3當選Leader。服務器1,2更改狀態為FOLLOWING,服務器3更改狀態為LEADING;
(4)服務器4啟動,發起一次選舉。此時服務器1,2,3已經不是LOOKING狀態,不會更改選票信息。交換選票信息結果:服務器3為3票,服務器4為1票。此時服務器4服從多數,更改選票信息為服務器3,并更改狀態為FOLLOWING;
(5)服務器5啟動,同4一樣當小弟。
總結:先選自己,選完之后選票不夠,就會把選票投給myid大的節點
一旦選舉成功后,其他節點自動變成follower狀態,再次啟動的節點
相關概念:
SID:服務器ID。用來唯一標識一臺 ZooKeeper集群中的機器,每臺機器不能重復,和myid一致。
ZXID:事務ID。ZXID是一個事務ID,用來標識一次服務器狀態的變更。在某一時刻, 集群中的每臺機器的ZXID值不一定完全一致,這和ZooKeeper服務器對于客戶端“更新請求”的處理邏輯有關。
Epoch:每個Leader任期的代號。沒有Leader時同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個數據就會增加
6.4.2 非第一次啟動
(1)當ZooKeeper集群中的一臺服務器出現以下兩種情況之一時,就會開始進入Leader選舉:
? 服務器初始化啟動。
? 服務器運行期間無法和Leader保持連接。
(2)而當一臺機器進入Leader選舉流程時,當前集群也可能會處于以下兩種狀態:
? 集群中本來就已經存在一個Leader。
對于第一種已經存在Leader的情況,機器試圖去選舉Leader時,會被告知當前服務器的Leader信息,對于該機器來說,僅僅需要和Leader機器建立連接,并進行狀態同步即可。
? 集群中確實不存在Leader。
假設ZooKeeper由5臺服務器組成,SID分別為1、2、3、4、5,ZXID分別為8、8、8、7、7,并且此時SID為3的服務器是Leader。某一時刻, 3和5服務器出現故障,因此開始進行Leader選舉。
選舉Leader規則: ①EPOCH大的直接勝出 ②EPOCH相同,事務id大的勝出 ③事務id相同,服務器id大的勝出
6.5 寫數據流程
6.5.1 寫流程之寫入請求直接發送給Leader節點
寫請求給leader,leader將寫請求通知follower,follower寫完之后回復leader以ack,當超過半數的寫完了,就回復客戶端寫完了,其余沒寫完的繼續寫
6.5.2 寫流程之寫入請求發送給follower節點
follower沒有寫權限,將寫請求送給有權限的Leader,Leader先自己寫一份,寫完后,傳達寫命令給其他的Follower節點,讓他們寫,當超過半數的寫完了,就回復客戶端寫完了,其余沒寫完的繼續寫
6.6 拜占庭將軍問題
拜占庭將軍問題是一個協議問題,拜占庭帝國軍隊的將軍們必須全體一致的決定是否攻擊某一支敵軍。問題是這些將軍在地理上是分隔開來的,并且將 軍中存在叛徒。叛徒可以任意行動以達到以下目標:欺騙某些將軍采取進攻行動;促成一個不是所有將軍都同意的決定,如當將軍們不希望進攻時促成進攻 行動;或者迷惑某些將軍,使他們無法做出決定。如果叛徒達到了這些目的之一,則任何攻擊行動的結果都是注定要失敗的,只有完全達成一致的努力才能獲得勝利。
這個問題實際上就是多臺服務器處理某一個一致性問題,解決分布式一致性的經典算法之一是下文中的Paxos算法。
6.4 Paxos算法
Paxos算法是一種基于消息傳遞且具有高度容錯特性的一致性算法。
分布式系統中的節點通信存在兩種模型:共享內存(Shared memory)和消息傳遞(Messages passing)。基于消息傳遞通信模型的分布式系統,不可避免的會發生以下錯誤:進程可能會慢、被殺死或者重啟,消息可能會延遲、丟失、重復,在基礎 Paxos 場景中,先不考慮可能出現消息篡改即拜占庭錯誤的情況。Paxos 算法解決的問題是在一個可能發生上述異常的分布式系統中如何就某個值達成一致,保證不論發生以上任何異常,都不會破壞決議的一致性。
Paxos算法描述:
? 在一個Paxos系統中,首先將所有節點劃分為Proposer(提議者),Acceptor(接受者),和 Learner(學習者)。(注意:每個節點都可以身兼數職)。
一個完整的Paxos算法流程分為三個階段:
? Prepare準備階段
? Proposer向多個Acceptor發出Propose請求Promise(承諾)
? Acceptor針對收到的Propose請求進行Promise(承諾)
? Accept接受階段
? Proposer收到多數Acceptor承諾的Promise后,向Acceptor發出Propose請求
? Acceptor針對收到的Propose請求進行Accept處理
? Learn學習階段:Proposer將形成的決議發送給所有Learners
(1)Prepare: Proposer生成全局唯一且遞增的Proposal ID (可使用時間戳加Server ID),向所有Acceptors發送Prepare請求,這里無需攜帶提案內容,只攜帶Proposal ID即可。
(2)Promise: Acceptors收到Prepare請求后,做出“兩個承諾,一個應答”。
兩個承諾:
? 不再接受Proposal ID小于等于(注意:這里是<= )當前請求的Prepare請求。
? 不再接受Proposal ID小于(注意:這里是< )當前請求的Propose請求。
一個應答:
? 不違背以前做出的承諾下,回復已經Accept過的提案中Proposal ID最大的那個提案的Value和Proposal ID,沒有則返回空值。
(3)Propose: Proposer 收到多數Acceptors的Promise應答后,從應答中選擇Proposal ID最大的提案的Value,作為本次要發起的提案。如果所有應答的提案Value均為空值,則可以自己隨意決定提案Value。然后攜帶當前Proposal ID,向所有Acceptors發送Propose請求。
(4)Accept: Acceptor收到Propose請求后,在不違背自己之前做出的承諾下,接受并持久化當前Proposal ID和提案Value。
(5)Learn: Proposer收到多數Acceptors的Accept后,決議形成,將形成的決議發送給所有Learners。
下面我們針對上述描述做三種情況的推演舉例:為了簡化流程,我們這里不設置Learner。
總結
以上是生活随笔為你收集整理的超全zookeeper知识点与实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Altium Designer10铺铜技
- 下一篇: 程序员的修仙之路-筑基篇