zookeeper笔记+源码刨析
會不斷更新!沖沖沖!跳轉連接
https://blog.csdn.net/qq_35349982/category_10317485.html
zookeeper
1.介紹
Zookeeper 分布式數據一致性的解決方案,分布式應用程序可以基于他實現諸如數據訂閱/發布,負載均衡,命名服務,集群管理,分布式鎖,分布式隊列
2.安裝篇
2.1.安裝單機版
1.下載
cd /usr/local/src #進入指定目錄 #下載zookeeper wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz2.解壓
tar -zxvf zookeeper-3.4.13.tar.gz cd zookeeper-3.4.133.修改配置文件
#進配置文件 cd zookeeper-3.4.13/conf #將zoo_sample.cfg這個文件復制為zoo.cfg (必須是這個文件名) cp zoo_sample.cfg zoo.cfg #新增data跟log目錄 cd /usr/local/src/zookeeper/zookeeper-3.4.9 mkdir data mkdir log #進入zoo.cfg文件進行編輯 vi zoo.cfg #修改dataDir 新增dataLogDir dataDir=/usr/local/src/zookeeper/data dataLogDir=/usr/local/src/zookeeper/log4.配置環境變量
export ZOOKEEPER_INSTALL=/usr/local/src/zookeeper export PATH=$PATH:$ZOOKEEPER_INSTALL/bin5.啟動
cd /usr/local/src/zookeeper/bin ./zkServer.sh start2.2 安裝集群版
1.改名稱
mv zookeeper-3.4.14 zookeeper012.復制多分
cp -r zookeeper01/ zookeeper02 cp -r zookeeper01/ zookeeper033.修改配置文件
#創建兩個文件夾 mkdir datamkdir logs #修改配置文件名稱 cd conf mv zoo_sample.cfg zoo.cfg4.編寫配置文件 (修改三個配置文件)
clientPort=2181 dataDir=/usr/local/src/zookeeper2/data dataLogDir=/usr/local/src/zookeeper2/logs5.修改集群配置文件
先查看IP ( 不可以使用服務器IP)
ifconfig在每個zookeeper文件的data目錄下創建一個myid文件,內容分別為1.2.3,這個文件就是記錄每個服務器的ID
touch myid在每個zookeeper中的zoo.cfg文件中,配置集群服務器Ip
server.1=172.17.153.160:2881:3881 server.2=172.17.153.160:2882:3882 server.3=172.17.153.160:2883:3883 #server.服務器ID=服務器IP地址:服務器之間通信端?:服務器之間投票選舉端? jps #里面有6456 QuorumPeerMain 代表啟動成功6.啟動
./zkServer.sh start #啟動 ./zkServer.sh status #查看狀態 ./zkServer.sh stop #停止安裝的問題總結
1.停止8080端口
netstat -nltp | grep 2181kill -9 30272.注意開放的端口,以及防火墻的問題
3.查詢日志
./zkServer.sh start-foreground #查詢日志4.在log中有一個 out文件,看下里面的報錯信息
5.端口被占用后,把dara和logs中的舊文件全部刪掉
https://blog.csdn.net/Hello_World_QWP/article/details/90765608?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param
https://blog.csdn.net/qq_36651243/article/details/89396618
https://www.jianshu.com/p/0335f1f41420
https://www.cnblogs.com/zimo-jing/p/9037853.html
https://blog.csdn.net/weixin_45793065/article/details/106709479
3.基本命令
3.1 命令行
1.1.創建節點
#啟動 ./zkCli.sh#查詢當前節點 ls / #創建順序節點 create -s /zk-test 123 #創建臨時節點(客戶端重啟,節點關閉) create -e /zk-temp 123 #創建永久節點 create /zk-permanent 1231.2.讀取節點
#獲取節點 并查看信息 get /zk-permanent1.3.更新 刪除節點
get /zk-permanent #更新 set /zk-permanent 456 #刪除 (若該節點存在子節點則無法刪除,需刪除子節點) delete /zk-permanent3.2 ZooKeeper類
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version></dependency><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.2</version></dependency>2.1連接并創建
public class CreateNote implements Watcher {private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zooKeeper;/*建立會話*/public static void main(String[] args) throws IOException, InterruptedException, KeeperException {/*客戶端可以通過創建一個zk實例來連接zk服務器new Zookeeper(connectString,sesssionTimeOut,Wather)connectString: 連接地址:IP:端口sesssionTimeOut:會話超時時間:單位毫秒Wather:監聽器(當特定事件觸發監聽時,zk會通過watcher通知到客戶端)*/zooKeeper = new ZooKeeper("47.95.1.96:2181", 5000, new CreateNote());System.out.println(zooKeeper.getState());// 計數工具類:CountDownLatch:不讓main方法結束,讓線程處于等待阻塞//countDownLatch.await();\Thread.sleep(Integer.MAX_VALUE);}/*回調方法:處理來自服務器端的watcher通知*/public void process(WatchedEvent watchedEvent) {// SyncConnectedif(watchedEvent.getState() == Event.KeeperState.SyncConnected){//解除主程序在CountDownLatch上的等待阻塞System.out.println("process方法執行了...");// 創建節點try {createNoteSync();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}/*創建節點的方法*/private static void createNoteSync() throws KeeperException, InterruptedException {/*** path :節點創建的路徑* data[] :節點創建要保存的數據,是個byte類型的* acl :節點創建的權限信息(4種類型)* ANYONE_ID_UNSAFE : 表示任何人* AUTH_IDS :此ID僅可用于設置ACL。它將被客戶機驗證的ID替換。* OPEN_ACL_UNSAFE :這是一個完全開放的ACL(常用)--> world:anyone* CREATOR_ALL_ACL :此ACL授予創建者身份驗證ID的所有權限* createMode :創建節點的類型(4種類型)* PERSISTENT:持久節點* PERSISTENT_SEQUENTIAL:持久順序節點* EPHEMERAL:臨時節點* EPHEMERAL_SEQUENTIAL:臨時順序節點String node = zookeeper.create(path,data,acl,createMode);*/String note_persistent = zooKeeper.create("/my-persistent", "持久節點內容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("節點創建完成");// // 持久節點 // String note_persistent = zooKeeper.create("/lg-persistent", "持久節點內容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // // // 臨時節點 // String note_ephemeral = zooKeeper.create("/lg-ephemeral", "臨時節點內容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // // // 持久順序節點 // String note_persistent_sequential = zooKeeper.create("/lg-persistent_sequential", "持久順序節點內容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // // System.out.println("創建的持久節點" + note_persistent); // System.out.println("創建的臨時節點" + note_ephemeral); // System.out.println("創建的持久順序節點" + note_persistent_sequential);} }2.2查詢,更新
/*更新數據節點內容的方法*/private void updateNoteSync() throws KeeperException, InterruptedException {/*path:路徑data:要修改的內容 byte[]version:為-1,表示對最新版本的數據進行修改zooKeeper.setData(path, data,version);*///新建Stat stat = zooKeeper.setData("/my-persistent", "我新建了一個節點".getBytes(), -1);//查詢byte[] data = zooKeeper.getData("/my-persistent", false, null);System.out.println("修改前的值:" + new String(data));//修改Stat stat1 = zooKeeper.setData("/my-persistent", "我修改了我新建的節點".getBytes(), -1);//查詢byte[] data2 = zooKeeper.getData("/my-persistent", false, null);System.out.println("修改后的值:" + new String(data2));}2.3刪除
/*刪除節點的方法*/private void deleteNoteSync() throws KeeperException, InterruptedException {/*zooKeeper.exists(path,watch) :判斷節點是否存在zookeeper.delete(path,version) : 刪除節點*/Stat stat = zooKeeper.exists("/lg-persistent/c1", false);System.out.println(stat == null ? "該節點不存在":"該節點存在");if(stat != null){zooKeeper.delete("/lg-persistent/c1",-1);}Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false);System.out.println(stat2 == null ? "該節點不存在":"該節點存在");}3.3 curator
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency>3.1連接
//不使用fluent編程風格RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);// 使用fluent編程風格CuratorFramework client = CuratorFrameworkFactory.builder().connectString("47.95.1.96:2181").sessionTimeoutMs(50000).connectionTimeoutMs(30000).retryPolicy(exponentialBackoffRetry).namespace("base") // 獨立的命名空間 /base.build();client.start();System.out.println("會話2創建了");// 創建節點String path = "/lg-curator/c1";String s = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());System.out.println("節點遞歸創建成功,該節點路徑" + s);3.2新增,獲取狀態信息
// 創建節點String path = "/lg-curator/c1";String s = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());System.out.println("節點遞歸創建成功,該節點路徑" + s);// 獲取節點的數據內容及狀態信息// 數據內容byte[] bytes = client.getData().forPath(path);System.out.println("獲取到的節點數據內容:" + new String(bytes));// 狀態信息Stat stat = new Stat();client.getData().storingStatIn(stat).forPath(path);System.out.println("獲取到的節點狀態信息:" + stat );3.3更新
// 更新節點內容 //1int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改內容1".getBytes()).getVersion();System.out.println("當前的最新版本是" + version);byte[] bytes2 = client.getData().forPath(path);System.out.println("修改后的節點數據內容:" + new String(bytes2));// BadVersionExceptionclient.setData().withVersion(stat.getVersion()).forPath(path,"修改內容2".getBytes());3.4刪除
// 刪除節點String path = "/lg-curator";client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);System.out.println("刪除成功,刪除的節點" + path);3.5 三種監聽
NodeCache: 對一個節點進行監聽,監聽事件包括指定的路徑節點的增、刪、改的操作。
PathChildrenCache: 對指定的路徑節點的一級子目錄進行監聽,不對該節點的操作進行監聽,對其子目錄的節點進行增、刪、改的操作監聽
TreeCache: 可以將指定的路徑節點作為根節點(祖先節點),對其所有的子節點操作進行監聽,呈現樹形目錄的監聽,可以設置監聽深度,最大監聽深度為2147483647(int類型的最大值)
1.PathChildrenCache實現
//PathChildrenCache的使用 //創建監聽事件PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, "/serviceList", false);//監聽路徑下的所有節點pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//異步初始化pathChildrenCache.getListenable().addListener(new CilentListener()); //==================== public class CilentListener implements PathChildrenCacheListener {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {System.out.println(pathChildrenCacheEvent.getData().getPath());}if (type.equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {System.out.println("新增"+pathChildrenCacheEvent.getData().getPath());//獲取新增的節點地址String path = pathChildrenCacheEvent.getData().getPath();String[] split = path.split("/");//獲取服務器的地址String serviceValue = CuratorUtils.findServiceRegister(pathChildrenCacheEvent.getData().getPath());//添加元素Map<String, String> serverAddressMap = ConsumerBoot.serverAddressMap;serverAddressMap.put(split[split.length-1],serviceValue);}if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {String path = pathChildrenCacheEvent.getData().getPath();String[] split = path.split("/");System.out.println("移除"+pathChildrenCacheEvent.getData().getPath());//在Map中移除節點Map<String, String> serverAddressMap = ConsumerBoot.serverAddressMap;serverAddressMap.remove(split[split.length-1]);System.out.println("map中的數量"+serverAddressMap.size());}} }工具類
package com.lagou.zookeeper;import com.lagou.ZKConstant;import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat;import java.nio.ByteBuffer; import java.util.*;public class CuratorUtils {private static String serverList = "serviceList";private static int serverNum = 0;public static CuratorFramework build(){CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()//.connectString("47.95.1.96:2181").connectString("127.0.0.1:2181").namespace(ZKConstant.ZK_NAMESPACE)//.connectionTimeoutMs(15000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();curatorFramework.start();return curatorFramework;}/*** 服務注冊* @throws Exception*/public static CuratorFramework serviceRegister(String address,int port) throws Exception {//創建連接CuratorFramework client= CuratorUtils.build();//創建臨時節點,拼接ip+端口String serviceAddress = address+":"+port;Stat s = client.checkExists().forPath("/"+serverList);if (s == null) {//根節點client.create().withMode(CreateMode.PERSISTENT).forPath("/"+serverList);}client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/"+serverList+"/"+String.valueOf(serverNum+1),serviceAddress.getBytes());return client;}/*** 服務列表* @return* @throws Exception*/public static List<String> findServiceRegList() throws Exception {//創建連接CuratorFramework client= CuratorUtils.build();List<String> nodeList = client.getChildren().forPath("/" + serverList);return nodeList;}public static void deleteServiceRegister(String path) throws Exception {//創建連接CuratorFramework client= CuratorUtils.build();client.delete().forPath(path);// return nodeList;}//獲取內容public static String findServiceRegister(String path) throws Exception {//創建連接CuratorFramework client= CuratorUtils.build();byte[] bytes = client.getData().forPath(path);return new String(bytes);}/*** 新增時間響應* @param path* @param dateStr* @throws Exception*/public static void addServiceTime(String path,String dateStr) throws Exception {//創建連接CuratorFramework client= CuratorUtils.build();if(client.checkExists().forPath(path)==null){client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,dateStr.getBytes());}else{client.setData().forPath(path,dateStr.getBytes());}}public static void addReponseTime(String path,String dateStr) throws Exception {//創建連接CuratorFramework client= CuratorUtils.build();if(client.checkExists().forPath(path)==null){client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path,dateStr.getBytes());}else{client.setData().forPath(path,dateStr.getBytes());}}public static byte[] longToBytes(long x) {ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);buffer.putLong(x);return buffer.array();}/*** 求Map<K,V>中Value(值)的最小值* @param map* @return*/public static Object getMinValue(Map<String, Integer> map) {if (map == null) return null;Collection<Integer> c = map.values();Object[] obj = c.toArray();Arrays.sort(obj);return obj[0];}public static void main(String[] args) throws Exception{int numbers[] ={1,2,5};Random random = new Random();int i = random.nextInt(numbers.length);System.out.println(i);System.out.println(numbers[i]);// findServiceRegList();// HashMap<String, Integer> stringStringHashMap = new HashMap<>(); // stringStringHashMap.put("8999",90 ); // stringStringHashMap.put("8998",80 ); // stringStringHashMap.put("8997",70 ); // stringStringHashMap.put("8996",70 ); // // List<Integer> serverServiceList = new ArrayList<Integer>(stringStringHashMap.values()); // List<String> serverKeyList = new ArrayList<String>(stringStringHashMap.keySet());// System.out.println(serverServiceList);// try { // //注冊服務 // CuratorFramework curatorFramework = serviceRegister("127.0.0.1", 8999); // //創建監聽 // PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, "/serviceList", false);//監聽msg_server_list路徑下的所有節點 // pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//異步初始化 // pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // @Override // public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { // PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); // if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // System.out.println(pathChildrenCacheEvent.getData().getPath()); // } // // if (type.equals(PathChildrenCacheEvent.Type.INITIALIZED)) { // System.out.println("新增"+pathChildrenCacheEvent.getData().getPath()); // } // if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // System.out.println("移除"+pathChildrenCacheEvent.getData().getPath()); // } // } // });// Thread.sleep(Long.MAX_VALUE); // String testPath="pathChildrenCacheTest"; // //創建連接 // CuratorFramework client= CacheListenerUtils.build(); // //如果testPath存在,刪除路徑 // Stat stat = client.checkExists().forPath("/"+testPath); // if(stat != null) // { // client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath); // } // //創建testPath // client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath,testPath.getBytes()); // // //創建PathChildrenCache // //參數:true代表緩存數據到本地 // PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/" + testPath,true); // //BUILD_INITIAL_CACHE 代表使用同步的方式進行緩存初始化。 // pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); // pathChildrenCache.getListenable().addListener((cf, event) -> { // PathChildrenCacheEvent.Type eventType = event.getType(); // switch (eventType) { // case CONNECTION_RECONNECTED: // pathChildrenCache.rebuild(); // break; // case CONNECTION_SUSPENDED: // break; // case CONNECTION_LOST: // System.out.println("Connection lost"); // break; // case CHILD_ADDED: // System.out.println("Child added"); // break; // case CHILD_UPDATED: // System.out.println("Child updated"); // break; // case CHILD_REMOVED: // System.out.println("Child removed"); // break; // default: // } // }); // // //創建子節點1 // client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/1",testPath.getBytes()); // Thread.sleep(1000); // //創建子節點1 // client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/2",testPath.getBytes()); // Thread.sleep(1000); // //刪除子節點1 // client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/1"); // Thread.sleep(1000); // //刪除子節點2 // client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/2"); // Thread.sleep(1000); // // // // // pathChildrenCache.close(); // } catch (Exception e) { // e.printStackTrace(); // // TODO: handle exception // }} } -Dlog4j.configuration=file:C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\log4j.properties4.應用場景
1.數據的發布與訂閱
發布與訂閱即所謂的配置中心
推(Push)
拉(Pull)
具備三個特征
- 數據量比較小
- 數據會動態發生變化
- 集群中各機器共享,配置一致
配置獲取
配置變更
2.命名服務
Zookeeper節點創建的API接口創建順序節點,并且返回值中會返回這個節點的完整名字,來生成全局唯一的ID
集群管理
Master選舉
分布式鎖
分布式隊列
3.集群管理
利用臨時節點,判斷服務器的斷開與連接,監聽。來管理集群
分布式日志收集系統
4.Master選舉
Master一般協調集群中的其他系統單元,具有對分布式系統狀態變更的絕對權。
5.分布式鎖
- 排他鎖
事務T1對數據對象O1加上了排他鎖,枷鎖期間,只允許事務T1對O1進行讀取和更新操作,加鎖期間,其他事務不能對這個數據對象進行任何類型的操作
- 共享鎖(讀鎖)
事務T1對數據對象O1加上了共享鎖,則當前事務只能對O1進行讀取事務,其他事務也只能對這個數據對象加共享鎖
羊群效應
6.分布式隊列
特征
- ZAB協議需要確保那些已經在Leader服務器上提交的事務最終被所以服務器都提交
- ZAB協議需要確保丟棄那些只在Leader服務器上被提出的事務
FIFO先入先出
First Input First Output 先入先出
分布式屏障
等待隊列元素聚集后同意安排處理執行的Barrier模型
5.Zookeeper原理
5.1 ZAB協議
Zookeeper Atomic Broadcast(ZAB,Zookeeper源自消息廣播協議)
支持崩潰恢復的原子廣播協議
cai用ZAB協議來實現分布式數據的一致性,主備模式的系統架構來保持個副本之間的數據一致性,表現形式就是 采用單一的主進程來接受并處理客戶端的所有事務請求,然后采用事務Proposal的形式廣播到所有的副本進程中
5.2兩種模式
崩潰恢復與消息廣播
5.3三種狀態*
- LOOKING :Leader選舉階段
- FOLLOWING:Follower服務器和Leader服務器
- LEADING : Leader服務器作為主進程領導狀態
所有進程初始化狀態都是LOOKING狀態
5.4ZAB與Paxos的聯系與區別
- 都存在一個類似于Leader進程的角色,負責協調多個Follower進程的運行
- Leader進程都會等待超過半數的Follower做出正確反饋后,才會提議進行提交
- 在ZAB協議中,每個Proposal都包含一個epoch值,用來代表當前的Leader周期,在Paxos中同樣存在這樣一個標識
ZAB協議主要用于高可用的分布式數據主備系統
Paxos算法主要構件一個分布式的一致性狀態機系統
5.4服務器的角色*
Leader
- 事務請求的唯一調度和處理者,保證集群事務
- 集群內部各服務器的調度者
Follwer
- 處理客戶端非事務性請求(讀取數據),轉發事務給Leader
- 參與事務請求Proposal的投票
- 參數Leader選舉投票
Observer
提供非事務服務
不參與投票
6.源碼刨析
1.搭建項目
在VM options中添加(log4j的文件目錄)
-Dlog4j.configuration=file:C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\log4j.properties在program arguments**( 配置zoo.cfg的目錄)**
C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\zoo.cfghttps://www.cnblogs.com/heyonggang/p/12123991.html
2.server的創建流程
啟動類 ZooKeeperServerMain
解析配置文件
config.parse(args[0]);
初始化日志
3.Leader選舉
概念
- 外部投票:特指其他服務器發來的投票
- 內部投票:服務器自身當前的投票
- 選舉輪次:ZooKeeper服務器Leader選舉的輪次,即logical clock(邏輯時鐘)
- Pk: 指對內部投票和外部投票進行一個對比來確定是否需要變更內部投票
- sendqueue:選票發送隊列:保存待發送的投票
- recvqueue:選票接收隊列,用于保存接收到的外部投票
//====================
服務器啟動時期的Leader選舉
1)每個Server發出一個投票
2)接受來自給各個服務器的投票
3)處理投票
- 優先檢查ZXID. ZXID比較大的服務器優先作為Leader
- 如果ZXID相同,那么就比較myid。 myId較大的服務器作為
4)統計投票
5)改變服務器狀態
服務器運行時期的Leader的選舉
? 1)變更狀態
? 2)每個Server會發出一個投票
? 3) 接受來自各個服務器的投票,與啟動時過程相同
? 4)處理投票
? 5)統計投票
? 6)改變服務器的狀態
public interface Election {//?尋找Leaderpublic Vote lookForLeader() throws InterruptedException;//停止public void shutdown(); }FastLeaderElection有三個重要的類
1.Notification
表示收到的選舉投票信息(其他服務器發來的選舉投票信息)
/*Notification表示收到的選舉投票信息(其他服務器發來的選舉投票信息),其包含了被選舉者的id、zxid、選舉周期等信息,其buildMsg方法將選舉信息封裝至ByteBuffer中再進行發送*/static public class Notification {/** Format version, introduced in 3.4.6*/public final static int CURRENTVERSION = 0x2;int version;/** Proposed leader**/// 被推選的leader的idlong leader;/** zxid of the proposed leader*/// 被推選的leader的事務idlong zxid;/** Epoch*/// 推選者的選舉周期long electionEpoch;/** current state of sender*/// 推選者的狀態QuorumPeer.ServerState state;/** Address of sender*/// 推選者的idlong sid;QuorumVerifier qv;/** epoch of the proposed leader*/// 被推選者的選舉周期long peerEpoch;}2.ToSend
表示發送給其他服務器的選舉投票信息
/*ToSend表示發送給其他服務器的選舉投票信息,也包含了被選舉者的id、zxid、選舉周期等信息*/static public class ToSend {static enum mType {crequest, challenge, notification, ack}ToSend(mType type,long leader,long zxid,long electionEpoch,ServerState state,long sid,long peerEpoch,byte[] configData) {this.leader = leader;this.zxid = zxid;this.electionEpoch = electionEpoch;this.state = state;this.sid = sid;this.peerEpoch = peerEpoch;this.configData = configData;}/** Proposed leader in the case of notification*///被推舉的leader的idlong leader;/** id contains the tag for acks, and zxid for notifications*/// 被推舉的leader的最大事務idlong zxid;/** Epoch*/// 推舉者的選舉周期long electionEpoch;/** Current state;*/// 推舉者的狀態QuorumPeer.ServerState state;/** Address of recipient*/// 推舉者的idlong sid;/** Used to send a QuorumVerifier (configuration info)*/byte[] configData = dummyData;/** Leader epoch*/// 被推舉的leader的選舉周期long peerEpoch;}LinkedBlockingQueue<ToSend> sendqueue;LinkedBlockingQueue<Notification> recvqueue;3.Messenger
protected class Messenger {/*** Receives messages from instance of QuorumCnxManager on* method run(), and processes such messages.*/class WorkerReceiver extends ZooKeeperThread {//是否停止volatile boolean stop;//服務器之間的連接QuorumCnxManager manager;WorkerReceiver(QuorumCnxManager manager) {super("WorkerReceiver");this.stop = false;this.manager = manager;}public void run() {//響應Message response;while (!stop) {//不停止// Sleeps on receivetry {//從RecvQueue取一個選舉投票信息(從其他服務器發射過來的)response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);//無投票則跳過if(response == null) continue;// The current protocol and two previous generations all send at least 28 bytesif (response.buffer.capacity() < 28) {LOG.error("Got a short response: " + response.buffer.capacity());continue;}// this is the backwardCompatibility mode in place before ZK-107// It is for a version of the protocol in which we didn't send peer epoch// With peer epoch and version the message became 40 bytesboolean backCompatibility28 = (response.buffer.capacity() == 28);// this is the backwardCompatibility mode for no version informationboolean backCompatibility40 = (response.buffer.capacity() == 40);response.buffer.clear();// Instantiate Notification and set its attributesNotification n = new Notification();int rstate = response.buffer.getInt();long rleader = response.buffer.getLong();long rzxid = response.buffer.getLong();long relectionEpoch = response.buffer.getLong();long rpeerepoch;int version = 0x0;if (!backCompatibility28) {rpeerepoch = response.buffer.getLong();if (!backCompatibility40) {/** Version added in 3.4.6*/version = response.buffer.getInt();} else {LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);}} else {LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);}QuorumVerifier rqv = null;// check if we have a version that includes config. If so extract config info from message.if (version > 0x1) {int configLength = response.buffer.getInt();byte b[] = new byte[configLength];response.buffer.get(b);synchronized(self) {try {rqv = self.configFromString(new String(b));QuorumVerifier curQV = self.getQuorumVerifier();if (rqv.getVersion() > curQV.getVersion()) {LOG.info("{} Received version: {} my version: {}", self.getId(),Long.toHexString(rqv.getVersion()),Long.toHexString(self.getQuorumVerifier().getVersion()));if (self.getPeerState() == ServerState.LOOKING) {LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());self.processReconfig(rqv, null, null, false);if (!rqv.equals(curQV)) {LOG.info("restarting leader election");self.shuttingDownLE = true;self.getElectionAlg().shutdown();break;}} else {LOG.debug("Skip processReconfig(), state: {}", self.getServerState());}}} catch (IOException e) {LOG.error("Something went wrong while processing config received from {}", response.sid);} catch (ConfigException e) {LOG.error("Something went wrong while processing config received from {}", response.sid);}} } else {LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);}/** If it is from a non-voting server (such as an observer or* a non-voting follower), respond right away.*/if(!validVoter(response.sid)) {//獲取自己的投票Vote current = self.getCurrentVote();QuorumVerifier qv = self.getQuorumVerifier();//構建對象ToSend notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,current.getPeerEpoch(),qv.toString().getBytes());//放到隊列等待發送sendqueue.offer(notmsg);} else {//接受到服務器的選票信息// Receive new messageif (LOG.isDebugEnabled()) {LOG.debug("Receive new notification message. My id = "+ self.getId());}// State of peer that sent this messageQuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;switch (rstate) {//讀取狀態case 0:ackstate = QuorumPeer.ServerState.LOOKING;break;case 1:ackstate = QuorumPeer.ServerState.FOLLOWING;break;case 2:ackstate = QuorumPeer.ServerState.LEADING;break;case 3:ackstate = QuorumPeer.ServerState.OBSERVING;break;default:continue;}n.leader = rleader;n.zxid = rzxid;//獲取選舉周期n.electionEpoch = relectionEpoch;n.state = ackstate;//設置服務器IDn.sid = response.sid;n.peerEpoch = rpeerepoch;n.version = version;n.qv = rqv;/** Print notification info*/if(LOG.isInfoEnabled()){printNotification(n);}/** If this server is looking, then send proposed leader*/if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){recvqueue.offer(n);/** Send a notification back if the peer that sent this* message is also looking and its logical clock is* lagging behind.*/if((ackstate == QuorumPeer.ServerState.LOOKING)&& (n.electionEpoch < logicalclock.get())){Vote v = getVote();QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification,v.getId(),v.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,v.getPeerEpoch(),qv.toString().getBytes());//將消息放到隊列中,等待發送sendqueue.offer(notmsg);}} else {/** If this server is not looking, but the one that sent the ack* is looking, then send back what it believes to be the leader.*///獲取當前的投票Vote current = self.getCurrentVote();if(ackstate == QuorumPeer.ServerState.LOOKING){if(LOG.isDebugEnabled()){LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",self.getId(),response.sid,Long.toHexString(current.getZxid()),current.getId(),Long.toHexString(self.getQuorumVerifier().getVersion()));}QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),current.getElectionEpoch(),self.getPeerState(),response.sid,current.getPeerEpoch(),qv.toString().getBytes());//將消息放到隊列中,等待發送sendqueue.offer(notmsg);}}}} catch (InterruptedException e) {LOG.warn("Interrupted Exception while waiting for new message" +e.toString());}}LOG.info("WorkerReceiver is down");}}/*** This worker simply dequeues a message to send and* and queues it on the manager's queue.*/class WorkerSender extends ZooKeeperThread {//是否終止volatile boolean stop;//服務器之間的連接QuorumCnxManager manager;WorkerSender(QuorumCnxManager manager){super("WorkerSender");this.stop = false;this.manager = manager;}public void run() {while (!stop) {try {//sendqueue中獲取消息ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;//不為空則進行處理process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}/*** Called by run() once there is a new message to send.** @param m message to send*/void process(ToSend m) {//構建消息ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.peerEpoch,m.configData);//發送消息manager.toSend(m.sid, requestBuffer);}}WorkerSender ws;WorkerReceiver wr;Thread wsThread = null;Thread wrThread = null;/*** Constructor of class Messenger.** @param manager Connection manager*/Messenger(QuorumCnxManager manager) {//創建 WorkerSenderthis.ws = new WorkerSender(manager);//創建新線程this.wsThread = new Thread(this.ws,"WorkerSender[myid=" + self.getId() + "]");//守護線程this.wsThread.setDaemon(true);//創建WorkerReceiverthis.wr = new WorkerReceiver(manager);this.wrThread = new Thread(this.wr,"WorkerReceiver[myid=" + self.getId() + "]");//設置守護線程this.wrThread.setDaemon(true);}/*** Starts instances of WorkerSender and WorkerReceiver*/void start(){this.wsThread.start();this.wrThread.start();}/*** Stops instances of WorkerSender and WorkerReceiver*/void halt(){this.ws.stop = true;this.wr.stop = true;}}類屬性
//日志 private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);/*** Determine how much time a process has to wait* once it believes that it has reached the end of* leader election.*/ //完成leader election的等待時間final static int finalizeWait = 200;/*** Upper bound on the amount of time between two consecutive* notification checks. This impacts the amount of time to get* the system up again after long partitions. Currently 60 seconds.*/ //兩個連續 通知之間的最大時間final static int maxNotificationInterval = 60000;/*** This value is passed to the methods that check the quorum* majority of an established ensemble for those values that* should not be taken into account in the comparison * (electionEpoch and zxid). */final static int IGNOREVALUE = -1;/*** Connection manager. Fast leader election uses TCP for* communication between peers, and QuorumCnxManager manages* such connections.*///管理服務器之間的連接QuorumCnxManager manager;// 選票發送隊列,?于保存待發送的選票LinkedBlockingQueue<ToSend> sendqueue;// 選票接收隊列,?于保存接收到的外部投票LinkedBlockingQueue<Notification> recvqueue;// 投票者QuorumPeer self;Messenger messenger;// 邏輯時鐘volatile long logicalclock; /* Election instance */// 推選的leader的idlong proposedLeader;// 推選的leader的zxidlong proposedZxid;// 推選的leader的選舉周期long proposedEpoch;// 是否停?選舉volatile boolean stop;4.sendNotififications
private void sendNotifications() {//遍歷投票參與者集合for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();//構建對象ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());if(LOG.isDebugEnabled()){LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +" (n.round), " + sid + " (recipient), " + self.getId() +" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");}//放到隊列等待發送sendqueue.offer(notmsg);}}5.totalOrderPredicate
該函數將接受的投票與自身投票進行pk,查看是否消息中包含的服務器id是否更優,其按照epoch,zxid,id的優先級進行pk
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));//使用投票器看權重是否為0if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same* as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) ||((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));}6.termPredicate
判斷leader選舉是否結束,是否有一半以上的服務器選出了leader
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {SyncedLearnerTracker voteSet = new SyncedLearnerTracker();voteSet.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier() != null&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}/** First make the views consistent. Sometimes peers will have different* zxids for a server depending on timing.*/for (Map.Entry<Long, Vote> entry : votes.entrySet()) {/ 將等于當前投票的項放?setif (vote.equals(entry.getValue())) {voteSet.addAck(entry.getKey());}}//統計set,查看投某個id的票數是否超過?半return voteSet.hasAllQuorums();}概念補充
1.zxid概念補充
ZooKeeper節點狀態改變的每一個操作都將使節點接收到一個Zxid格式的時間戳,并且這個時間戳全局有序。也就是說,每個對節點的改變都將產生一個唯一的Zxid。
cZxid概念
對應為該節點的創建時間(Create)
mZxid概念
對應該節點的最近一次修改的時間(Mofify)
與其子節點無關
問題
要把項目中Build里面的jar包在項目中引入一下
作業的資料查找
https://segmentfault.com/a/1190000019670015
https://blog.csdn.net/sqh201030412/article/details/51446434?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-1&spm=1001.2101.3001.4242
單詞
subscribe 訂閱,捐款
Access Control Lists,ACL 訪問控制列表
Latch 插銷,鎖
election 選舉
Notification 通知
extract通知
Verifier 檢驗人
vote 投票
Quorum 大多數
Predicate 斷言,暗示
Epoch 時代
Interval 間隔,區間
propos 關于評論
logical邏輯
Policy 政策
Retry 停止重試
Exponential 指數
guarantee保證
tribute 奉獻,致敬
總結
以上是生活随笔為你收集整理的zookeeper笔记+源码刨析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mysql 主从复制+MHA搭建
- 下一篇: 分布式理论、架构设计(自定义RPC)