006_Curator框架一
1. 為了更好的實現Java操作ZooKeeper服務器, 后來出現了非常強大的Curator框架, 目前是Apache的頂級項目。里面提供了更多豐富的操作, 例如Session超時重連、主從選舉、分布式計數器、分布式鎖等等適用于各種復雜的ZooKeeper場景的API封裝。
2. Curator包含了幾個包
2.1. curator-framework: 對ZooKeeper的底層API的一些封裝。
2.2. curator-client: 提供一些客戶端的操作, 例如重試策略等。
2.3. curator-recipes: 封裝了一些高級特性, 如: Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等。
3. Curator框架中使用鏈式編程風格, 易讀性更強, 使用工廠方法創建連接對象。可以使用CuratorFrameworkFactory工廠創建連接, 參數1: connectString連接字符串; 參數2: retryPolicy重試連接策略; 參數3: sessionTimeoutMs會話超時時間, 默認為60 000ms; 參數4: connectionTimeoutMs連接超時時間, 默認是15 000ms。
4. 創建節點例子
4.1.?新建一個名為Curator的Java項目, 拷入相關jar包
4.2.?創建節點代碼
package com.fj.zkcurator;import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode;public class Create {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 創建節點try {// 4.1. creatingParentsIfNeeded()可以遞歸創建節點// 4.2. withMode(CreateMode.PERSISTENT)創建持久化節點String result = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/root/child", "I am root first child.".getBytes());System.out.println("創建結果: " + result);} catch (Exception e1) {e1.printStackTrace();}// 5. 關閉連接cf.close();} }4.3.?運行結果
5. 異步創建節點例子
5.1. 異步創建節點
package com.fj.zkcurator;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode;public class CreateInBackground {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final CountDownLatch cdl = new CountDownLatch(1);public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 創建節點// 4.1. 線程池ExecutorService pool = Executors.newCachedThreadPool();try {// 4.2. creatingParentsIfNeeded()可以遞歸創建節點// 4.3. withMode(CreateMode.PERSISTENT)創建持久化節點// 4.4. inBackground異步后臺創建節點cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {// 異步執行完成, 發送信號量, 讓后續阻塞程序能夠繼續向下執行cdl.countDown();System.out.println("resultCode: " + ce.getResultCode());System.out.println("type: " + ce.getType());}}, pool).forPath("/root/child01", "I am root first child.".getBytes());// 進行阻塞cdl.await();} catch (Exception e1) {e1.printStackTrace();}// 5. 關閉連接cf.close();} }5.2. 運行結果
6. 獲取子節點例子
6.1. 獲取子節點
package com.fj.zkcurator;import java.util.List; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry;public class GetChildren {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 獲取某路徑的子節點try {List<String> children = cf.getChildren().forPath("/root");for (String child : children) {System.out.println(child);}} catch (Exception e) {e.printStackTrace();}// 5. 關閉連接cf.close();} }6.2. 運行結果
7. 獲取和設置值例子
7.1. 獲取和設置值
package com.fj.zkcurator;import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.data.Stat;public class GetSetData {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 設置和獲取節點數據try {// 4.1. 設置節點數據byte[] data = cf.getData().forPath("/root/child01");System.out.println(new String(data));// 4.2. 設置節點數據, 返回屬性信息Stat stat = cf.setData().forPath("/root/child01", "I am root first child. modify 006".getBytes());System.out.println("czxid: " + stat.getCzxid() + ", ctime: " + stat.getCtime() + ", cversion: " + stat.getCversion());System.out.println("mzxid: " + stat.getMzxid() + ", mtime: " + stat.getMtime() + ", pzxid: " + stat.getPzxid());System.out.println("version: " + stat.getVersion() + ", dataLength: " + stat.getDataLength() + ", aversion: " + stat.getAversion());System.out.println("numChildren: "+ stat.getNumChildren() + ", ephemeralOwner: " + stat.getEphemeralOwner());} catch (Exception e1) {e1.printStackTrace();}// 5. 關閉連接cf.close();} }7.2. 運行結果
8. 檢查節點是否存在和刪除節點例子
8.1. 檢查節點是否存在和刪除節點
package com.fj.zkcurator;import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.data.Stat;public class CheckExistsDelete {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 刪除節點try {// 4.1. 檢查節點是否存在Stat stat = cf.checkExists().forPath("/root/child01");if(stat == null) {System.out.println("/root/child01不存在.");}if(stat != null) {System.out.println("開始刪除/root/child01節點.");// 4.2. deletingChildrenIfNeeded()遞歸刪除節點cf.delete().deletingChildrenIfNeeded().forPath("/root/child01");if(cf.checkExists().forPath("/root/child01") == null) {System.out.println("/root/child01節點刪除成功.");}}} catch (Exception e) {e.printStackTrace();}// 5. 關閉連接cf.close();} }8.2. 運行結果
9. 我們使用NodeCache的方式去客戶端實例中注冊一個監聽緩存, 然后實現對應的監聽方法, 監聽方式CuratorCacheListener。
10. 監聽緩存例子
10.1. 監聽緩存
package com.fj.zkcurator;import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.CuratorCacheListenerBuilder.ChangeListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode;public class CuratorCacheCuratorCacheListener {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 15 * 1000;public static final String PATH = "/curatorCache";public static final ThreadLocalRandom random = ThreadLocalRandom.current();public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 創建CuratorCacheListenerCuratorCacheListener listener = CuratorCacheListener.builder().forCreates(new Consumer<ChildData>() {@Overridepublic void accept(ChildData node) {System.out.println(String.format("%s created. value: [%s]", node.getPath(), new String(node.getData())));}}).forChanges(new ChangeListener() {@Overridepublic void event(ChildData oldNode, ChildData node) {System.out.println(String.format("%s changed. oldValue: [%s] newValue: [%s]", node.getPath(),new String(oldNode.getData()), new String(node.getData())));}}).forDeletes(new Consumer<ChildData>() {@Overridepublic void accept(ChildData oldNode) {System.out.println(String.format("%s deleted. oldValue: [%s]", oldNode.getPath(), new String(oldNode.getData())));}}).forInitialized(new Runnable() {@Overridepublic void run() {System.out.println("\r\nCache initialized.");}}).build();// 5. 創建CuratorCacheCuratorCache cache = CuratorCache.build(cf, PATH);// 6. 注冊監聽cache.listenable().addListener(listener);// 7. 啟動cachecache.start();// 8. 創建、刪除和設置節點try {for (int i = 0; i < 10; ++i) {int depth = random.nextInt(1, 3);String path = makeRandomPath(random, depth);System.out.println("\r\npath = " + path);if(nodeExist(cf, path)) {if(random.nextBoolean()) {cf.setData().forPath(path, UUID.randomUUID().toString().getBytes());} else {cf.delete().deletingChildrenIfNeeded().forPath(path);}} else {String result = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, UUID.randomUUID().toString().getBytes());System.out.println("result = " + result);}Thread.sleep(5000);}} catch (Exception e) {e.printStackTrace();}// 9. 關閉連接cf.close();}private static boolean nodeExist(CuratorFramework cf, String path) throws Exception {return cf.checkExists().forPath(path) != null ? true : false;}private static String makeRandomPath(ThreadLocalRandom random, int depth) {if (depth == 0) {return PATH;}return makeRandomPath(random, depth - 1) + "/" + random.nextInt(3);} }10.2. 運行結果
11. 重復注冊例子
11.1. 監聽
package com.fj.zkcurator.scene;import java.util.function.Consumer; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.CuratorCacheListenerBuilder.ChangeListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode;public class CacheWather {private static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";private static final int sessionTimeoutMs = 10 * 60 * 1000;private static final int connectionTimeoutMs = 15 * 1000;private static final String PATH = "/superCuratorCache";private CuratorFramework cf = null;public CacheWather() {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 創建CuratorCacheListenerCuratorCacheListener listener = CuratorCacheListener.builder().forCreates(new Consumer<ChildData>() {@Overridepublic void accept(ChildData node) {System.out.println(String.format("%s created. value: [%s]", node.getPath(), new String(node.getData())));}}).forChanges(new ChangeListener() {@Overridepublic void event(ChildData oldNode, ChildData node) {System.out.println(String.format("%s changed. oldValue: [%s] newValue: [%s]", node.getPath(),new String(oldNode.getData()), new String(node.getData())));}}).forDeletes(new Consumer<ChildData>() {@Overridepublic void accept(ChildData oldNode) {System.out.println(String.format("%s deleted. oldValue: [%s]", oldNode.getPath(), new String(oldNode.getData())));}}).forInitialized(new Runnable() {@Overridepublic void run() {System.out.println("\r\nCache initialized.");}}).build();// 5. 創建CuratorCacheCuratorCache cache = CuratorCache.build(cf, PATH);// 6. 注冊監聽cache.listenable().addListener(listener);// 7. 啟動cachecache.start();// 8. 如果緩存節點不存在就創建try {if(cf.checkExists().forPath(PATH) == null) {cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(PATH, "init".getBytes());}} catch (Exception e) {e.printStackTrace();}}public void close() {if(cf != null) {cf.close();}} }11.2. UserApp
package com.fj.zkcurator.scene;public class UserApp {public static void main(String[] args) {CacheWather ca = new CacheWather();System.out.println("UserApp啟動成功...");try {Thread.sleep(5*60*1000);} catch (InterruptedException e) {e.printStackTrace();}ca.close();} }11.3. PayApp
package com.fj.zkcurator.scene;public class PayApp {public static void main(String[] args) {CacheWather ca = new CacheWather();System.out.println("PayApp啟動成功...");try {Thread.sleep(5*60*1000);} catch (InterruptedException e) {e.printStackTrace();}ca.close();} }11.4. OperationNode
package com.fj.zkcurator.scene;import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat;public class OperationNode {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 節點操作try {String r1 = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/superCuratorCache/child01", "child01.".getBytes());System.out.println("創建結果: " + r1);Thread.sleep(1000);String r2 = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/superCuratorCache/child02", "child02.".getBytes());System.out.println("創建結果: " + r2);Thread.sleep(1000);cf.delete().deletingChildrenIfNeeded().forPath("/superCuratorCache/child02");Thread.sleep(1000);Stat stat = cf.setData().forPath("/superCuratorCache/child01", "modify child01 data.".getBytes());System.out.println("czxid: " + stat.getCzxid() + ", ctime: " + stat.getCtime() + ", cversion: " + stat.getCversion());System.out.println("mzxid: " + stat.getMzxid() + ", mtime: " + stat.getMtime() + ", pzxid: " + stat.getPzxid());System.out.println("version: " + stat.getVersion() + ", dataLength: " + stat.getDataLength() + ", aversion: " + stat.getAversion());System.out.println("numChildren: "+ stat.getNumChildren() + ", ephemeralOwner: " + stat.getEphemeralOwner());} catch (Exception e1) {e1.printStackTrace();}// 5. 關閉連接cf.close();} }11.5. 運行UserApp
11.6. 運行PayApp
11.7. 運行OperationNode
11.8. UserApp監聽到節點變化
11.9. PayApp監聽到節點變化
11.10. 再次運行UserApp
11.11. 再次運行PayApp
總結
以上是生活随笔為你收集整理的006_Curator框架一的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 005_Java操作ZooKeeper
- 下一篇: 007_Curator框架二