Ⅵ:zookeeper的Watcher事件监听机制
2021最新zookeeper系列
?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??
Ⅰ:zookeeper的單機(jī)安裝 - 詳細(xì)教程:https://blog.csdn.net/Kevinnsm/article/details/116134397?spm=1001.2014.3001.5501
Ⅱ:zookeeper的相關(guān)shell命令:https://blog.csdn.net/Kevinnsm/article/details/116137602?spm=1001.2014.3001.5501
Ⅲ:zookeeper之查看節(jié)點(diǎn)的狀態(tài)信息:https://blog.csdn.net/Kevinnsm/article/details/116143218?spm=1001.2014.3001.5501
Ⅳ:zookeeper的acl權(quán)限控制:https://blog.csdn.net/Kevinnsm/article/details/116167394?spm=1001.2014.3001.5501
Ⅴ:zookeeper的相關(guān)Java Api:https://blog.csdn.net/Kevinnsm/article/details/116462557?spm=1001.2014.3001.5501
Ⅵ:zookeeper的Watcher事件監(jiān)聽機(jī)制:https://blog.csdn.net/Kevinnsm/article/details/116501842?spm=1001.2014.3001.5501
Ⅶ:教你一招利用zookeeper作為服務(wù)的配置中心:https://blog.csdn.net/Kevinnsm/article/details/116542974?spm=1001.2014.3001.5501
?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??
文章目錄
- 前置:--》把握住Watcher流程《--
- 1、watcher的連接狀態(tài)判斷
- 2、watcher機(jī)制下的exists
- Ⅰ、連接對(duì)象的監(jiān)聽器
- Ⅱ、自定義watcher
- Ⅲ、watcher的多次監(jiān)聽
- Ⅳ、多個(gè)watcher同時(shí)監(jiān)聽一個(gè)節(jié)點(diǎn)
- 3、watcher機(jī)制下的getData
- Ⅰ、連接對(duì)象的監(jiān)聽器
- Ⅱ、自定義watcher監(jiān)聽器
- Ⅲ、多次watcher監(jiān)聽
- Ⅳ、多個(gè)watcher同時(shí)監(jiān)聽一個(gè)節(jié)點(diǎn)
- 4、watcher機(jī)制下的getChildren
- Ⅰ、連接對(duì)象的監(jiān)視器
- Ⅱ、自定義watcher監(jiān)聽器
- Ⅲ、多次watcher監(jiān)聽
- Ⅳ、多個(gè)watcher同時(shí)監(jiān)聽一個(gè)節(jié)點(diǎn)
xshell7連接云服務(wù)器演示結(jié)果,如果未知請(qǐng)看第一章
前置:–》把握住Watcher流程《–
1、連接zookeeper服務(wù)器
2、連接時(shí)必須使當(dāng)前線程等待(等待其他線程創(chuàng)建連接zookeeper服務(wù)成功,使用計(jì)數(shù)器實(shí)現(xiàn))
3、執(zhí)行回調(diào)函數(shù)process
4、釋放當(dāng)前線程
1、watcher的連接狀態(tài)判斷
package com.zookeeper.watcher;import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;/*** @author:抱著魚睡覺的喵喵* @date:2021/5/7* @description:*/ public class WatcherConnection implements Watcher { //計(jì)數(shù)器,使當(dāng)前線程等待其他線程完成static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;public static void main(String[] args) {try {//連接zookeeper服務(wù)zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection());//使當(dāng)前線程等待其他線程完成(其他線程也就是連接zookeeper服務(wù)的線程)countDownLatch.await();Thread.sleep(1000);zooKeeper.close();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//回調(diào)函數(shù),進(jìn)性狀態(tài)的判斷@Overridepublic void process(WatchedEvent watchedEvent) {try {if (watchedEvent.getType() == Event.EventType.None) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("連接成功!");countDownLatch.countDown();} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {System.out.println("斷開連接");} else if (watchedEvent.getState() == Event.KeeperState.Expired) {System.out.println("超時(shí)了");} else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {System.out.println("認(rèn)證失敗!");}}} catch (Exception e) {e.printStackTrace();}}}2、watcher機(jī)制下的exists
Ⅰ、連接對(duì)象的監(jiān)聽器
public class WatcherExistsTest {private String IP = "8.140.37.103:2181";private ZooKeeper zookeeper;@Beforepublic void connection() throws IOException, InterruptedException {//計(jì)數(shù)器對(duì)象,使當(dāng)前線程等待其他線程的完成final CountDownLatch downLatch = new CountDownLatch(1);zookeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//判斷是否連接成功if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {//使CountDownLatch減到0(初始為1),其他線程可以繼續(xù)執(zhí)行(該處應(yīng)該是主線程可以繼續(xù)執(zhí)行了)downLatch.countDown();System.out.println("連接成功!");}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});//主線程進(jìn)入等待態(tài)downLatch.await();}@Testpublic void watcherExists() throws KeeperException, InterruptedException {//第一個(gè)參數(shù)是節(jié)點(diǎn)路徑//第二個(gè)參數(shù)為Boolean類型,true代表監(jiān)聽path下的節(jié)點(diǎn),false表示不進(jìn)行監(jiān)聽zookeeper.exists("/exists", true);Thread.sleep(10000);}@Afterpublic void close() {try {zookeeper.close();} catch (InterruptedException e) {e.printStackTrace();}} }此時(shí)在zookeeper客戶端創(chuàng)建/exists節(jié)點(diǎn)
IDEA控制臺(tái)就會(huì)出現(xiàn)NodeCreated在這里插入代碼片
當(dāng)然還有刪除節(jié)點(diǎn)的NodeDeleted等,不再演示
Ⅱ、自定義watcher
public class WatcherExistsTest {private String IP = "8.140.37.103:2181";private ZooKeeper zookeeper;@Beforepublic void connection() throws IOException, InterruptedException {//計(jì)數(shù)器對(duì)象,使當(dāng)前線程等待其他線程的完成final CountDownLatch downLatch = new CountDownLatch(1);zookeeper = new ZooKeeper(IP, 6000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {//使CountDownLatch減到0(初始為1),其他線程可以繼續(xù)執(zhí)行(該處應(yīng)該是主線程可以繼續(xù)執(zhí)行了)downLatch.countDown();}}});//主線程進(jìn)入等待態(tài)downLatch.await();}@Testpublic void watcherExists2() throws KeeperException, InterruptedException {zookeeper.exists("/exists2", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("自定義watcher!");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});Thread.sleep(10000);System.out.println("--------------");}@Afterpublic void close() {try {zookeeper.close();} catch (InterruptedException e) {e.printStackTrace();}} }執(zhí)行@Test注解方法-》客戶端創(chuàng)建/exists2節(jié)點(diǎn)-》IDEA控制臺(tái)查看結(jié)果
當(dāng)我修改/exists2節(jié)點(diǎn)的數(shù)據(jù)時(shí),控制臺(tái)出現(xiàn)了NodeDataChanged
Ⅲ、watcher的多次監(jiān)聽
本質(zhì)上只能進(jìn)性一次注冊(cè),一次監(jiān)聽;當(dāng)然可以利用循環(huán)調(diào)用進(jìn)行生命周期內(nèi)的多次監(jiān)聽
@Testpublic void watcherExists2() throws KeeperException, InterruptedException {zookeeper.exists("/exists2", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println("自定義watcher!");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());zookeeper.exists("/exists2", this);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});Thread.sleep(10000);System.out.println("--------------");}
Ⅳ、多個(gè)watcher同時(shí)監(jiān)聽一個(gè)節(jié)點(diǎn)
一般來說這種多個(gè)監(jiān)聽對(duì)象才比較符合發(fā)布-訂閱模式,當(dāng)節(jié)點(diǎn)中的數(shù)據(jù)發(fā)生變化時(shí),會(huì)通知所有的監(jiān)聽對(duì)象。
@Testpublic void watcherExists3() throws KeeperException, InterruptedException {System.out.println("============================");zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象1");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象2");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象3");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(10000);System.out.println("==========================");}
3、watcher機(jī)制下的getData
getData(String path, boolean b, Stat stat)連接對(duì)象的監(jiān)聽器
getData(String path, watcher watcher, Stat stat) 自定義的監(jiān)聽器
Ⅰ、連接對(duì)象的監(jiān)聽器
public class WatcherGetDataTest {static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;final String IP = "8.140.37.103:2181";@Beforepublic void before() throws IOException, InterruptedException {zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("=================");countDownLatch.countDown();}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});countDownLatch.await();}@Testpublic void test() throws KeeperException, InterruptedException {zooKeeper.getData("/data",true, null);Thread.sleep(10000);System.out.println("=======================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}}啟動(dòng)測(cè)試-》修改data節(jié)點(diǎn)的數(shù)據(jù)-》查看idea控制臺(tái)結(jié)果
Ⅱ、自定義watcher監(jiān)聽器
@Testpublic void test2() throws KeeperException, InterruptedException {System.out.println("========================");zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}}, null);Thread.sleep(10000);System.out.println("============================");}
Ⅲ、多次watcher監(jiān)聽
@Testpublic void test3() throws KeeperException, InterruptedException {System.out.println("=========================");Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());zooKeeper.getData("/data", this, null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}};zooKeeper.getData("/data", watcher, null);Thread.sleep(5000);System.out.println("=======================");}
Ⅳ、多個(gè)watcher同時(shí)監(jiān)聽一個(gè)節(jié)點(diǎn)
@Testpublic void test4() throws KeeperException, InterruptedException {System.out.println("=======================");zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象1");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象2");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象3");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);Thread.sleep(5000);System.out.println("========================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}
4、watcher機(jī)制下的getChildren
getChildren(String path, boolean b) //使用連接對(duì)象的監(jiān)視器
getChildren(String path, watcher w) //自定義監(jiān)視器
子節(jié)點(diǎn)的修改不會(huì)被監(jiān)測(cè)到
Ⅰ、連接對(duì)象的監(jiān)視器
public class WatcherGetChildrenTest {static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;final String IP = "8.140.37.103:2181";@Beforepublic void before() throws IOException, InterruptedException {zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("=================");countDownLatch.countDown();}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});countDownLatch.await();}@Testpublic void test() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", true);Thread.sleep(5000);}@Afterpublic void after() throws InterruptedException {zooKeeper.close();} }
Ⅱ、自定義watcher監(jiān)聽器
@Testpublic void test2() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("==================");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(10000);System.out.println("====================");}
Ⅲ、多次watcher監(jiān)聽
@Testpublic void test3() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("================");if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {try {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());zooKeeper.getChildren("/data", this);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}});Thread.sleep(5000);}
Ⅳ、多個(gè)watcher同時(shí)監(jiān)聽一個(gè)節(jié)點(diǎn)
@Testpublic void test4() throws KeeperException, InterruptedException {System.out.println("==================================");zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象1");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getType());}});zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象2");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監(jiān)聽對(duì)象3");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(5000);System.out.println("================================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}
總結(jié)
以上是生活随笔為你收集整理的Ⅵ:zookeeper的Watcher事件监听机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ⅴ:zookeeper的相关Java A
- 下一篇: IDEA主题设置(字体颜色背景)