Curator操作ZooKeeper
Curator極大簡化了ZooKeeper的使用,增加了針對ZooKeeper集群中connection的管理。
節(jié)點的創(chuàng)建和刪除
?
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//會話超時時間,默認為60000,單位:ms static final int CONNECTION_TIMEOUT=60000;//連接超時時間,默認為15000,單位:ms public static void main(String[] args) throws Exception {//重試策略:初試時間為10s,最大重試次數(shù)為20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//創(chuàng)建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//開啟連接 cf.start();//建立節(jié)點 指定節(jié)點類型(不加withMode默認為持久類型節(jié)點)、路徑、數(shù)據(jù)內容cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());Thread.sleep(30000);//刪除節(jié)點cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent");cf.close();} }run as--java application
線程休眠30s后,執(zhí)行節(jié)點刪除操作
?
節(jié)點內容的修改
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//會話超時時間,默認為60000,單位:ms static final int CONNECTION_TIMEOUT=60000;//連接超時時間,默認為15000,單位:ms public static void main(String[] args) throws Exception {//重試策略:初試時間為10s,最大重試次數(shù)為20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//創(chuàng)建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//開啟連接 cf.start();//創(chuàng)建節(jié)點cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());//cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p2","p2 value".getBytes());//讀取節(jié)點String ret1 = new String(cf.getData().forPath("/persistent/p1"));System.out.println(ret1);//修改節(jié)點cf.setData().forPath("/persistent/p1", "new p1 value".getBytes());String ret2 = new String(cf.getData().forPath("/persistent/p1"));System.out.println(ret2);cf.close();} }Eclipse的console輸出
Eclipse的ZooKeeper Explorer內容
?
節(jié)點操作的回調函數(shù)
節(jié)點的新增、修改、刪除,都可以設置其回調函數(shù)。該回調函數(shù)可以輸出服務器的狀態(tài)碼、服務器事件類型等內容。還可以加入一個線程池進行優(yōu)化操作。在批量節(jié)點操作的時候,可以用線程池去規(guī)劃callback,可以將很多的任務放到隊列中,使用線程池中的線程將隊列中的任務進行處理。線程池中線程的個數(shù)可以根據(jù)具體的機器配置而定。
下面代碼中,節(jié)點的創(chuàng)建操作是一個異步的過程,不會阻塞主線程main的執(zhí)行,代碼中將主線程main休眠,子線程在執(zhí)行完節(jié)點的創(chuàng)建操作后執(zhí)行回調函數(shù)并輸出相關內容。若不添加主線程休眠的代碼,則主線程執(zhí)行完代碼后結束,此時節(jié)點創(chuàng)建的子線程還沒有完成節(jié)點的創(chuàng)建,因main線程的結束子線程也結束,進而就不能完成節(jié)點創(chuàng)建和回調函數(shù)的執(zhí)行。
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//會話超時時間,默認為60000,單位:ms static final int CONNECTION_TIMEOUT=60000;//連接超時時間,默認為15000,單位:ms public static void main(String[] args) throws Exception {//重試策略:初試時間為10s,最大重試次數(shù)為20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//創(chuàng)建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//開啟連接 cf.start();// 綁定回調函數(shù)ExecutorService pool = Executors.newCachedThreadPool();cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {System.out.println("code:" + ce.getResultCode());System.out.println("type:" + ce.getType());System.out.println("線程為:" + Thread.currentThread().getName());}}, pool).forPath("/persistent/p2","p2 value".getBytes());System.out.println("主線程:"+Thread.currentThread().getName());Thread.sleep(Integer.MAX_VALUE);cf.close();} }Eclipse中console輸出
ZooKeeper Explorer中內容
獲取子節(jié)點
?
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//會話超時時間,默認為60000,單位:ms static final int CONNECTION_TIMEOUT=60000;//連接超時時間,默認為15000,單位:ms public static void main(String[] args) throws Exception {//重試策略:初試時間為10s,最大重試次數(shù)為20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//創(chuàng)建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//開啟連接 cf.start();// 綁定回調函數(shù)ExecutorService pool = Executors.newCachedThreadPool();cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {System.out.println("code:" + ce.getResultCode());System.out.println("type:" + ce.getType());System.out.println("線程為:" + Thread.currentThread().getName());}}, pool).forPath("/persistent/p2","p2 value".getBytes());System.out.println("主線程:"+Thread.currentThread().getName());Thread.sleep(20000);//主線程休眠20s,等待節(jié)點創(chuàng)建完畢// 讀取子節(jié)點getChildren方法 和 判斷節(jié)點是否存在checkExists方法List<String> list = cf.getChildren().forPath("/persistent");for(String p : list){System.out.println(p);}Stat stat_p1 = cf.checkExists().forPath("/persistent/p1");System.out.println(stat_p1);Stat stat_p2 = cf.checkExists().forPath("/persistent/p2");System.out.println(stat_p2);cf.close();} }Eclipse的console輸出
若上面代碼將Thread.sleep(20000);刪除,有時會出現(xiàn)下面的異常,原因是節(jié)點創(chuàng)建和main主線程的執(zhí)行是異步的。
?
轉載于:https://www.cnblogs.com/cat520/p/9412815.html
總結
以上是生活随笔為你收集整理的Curator操作ZooKeeper的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: elasticsearch 集群no k
- 下一篇: vc++笔记十一