Zookeeper场景实践:(5)分布式通知/协调
生活随笔
收集整理的這篇文章主要介紹了
Zookeeper场景实践:(5)分布式通知/协调
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.基本介紹
通知/協調機制通常有兩種方式。
- 系統調度模式:操作人員發送通知實際是通過控制臺改變某個節點的狀態,然后Zookeeper將這些變化發送給注冊了這個節點的Watcher的全部client。
- 工作匯報模式:這個情況是每一個工作進程都在某個文件夾下創建一個暫時節點,并攜帶工作的進度數據。
這樣匯總的進程能夠監控文件夾子節點的變化獲得工作進度的實時的全局情況。
總的來說,利用Zookeeper的watcher注冊和異步通知功能,通知的發送者創建一個節點。并將通知的數據寫入的該節點;通知的接受者則對該節點注冊watch,當節點變化時,就算作通知的到來。
場景實踐
通過上面的說明,事實上實現還是很easy的。看下關鍵的幾個地方:
- g_monitor_child:變量等于0標識僅僅監控節點,等于1標識監控全部子節點。
- show_notify(zh,g_path);:打印接受到的通知
- show_list(zh,g_path);:打印全部子節點的進度
再來看監控函數:
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) { //監控節點數據變化if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 0){show_notify(zh,g_path);//監控子節點個數變化}else if(type == ZOO_CHILD_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);//監控子節點數據變化}else if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);} }
以下是完整的代碼:
#include<stdio.h> #include<string.h> #include<unistd.h> #include"zookeeper.h" #include"zookeeper_log.h" char g_host[512]= "172.17.0.36:2181"; char g_path[512]= "/Notify"; int g_monitor_child = 0;//watch function when child list changed void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx); void show_notify(zhandle_t *zkhandle,const char *path); //show all process ip:pid void show_list(zhandle_t *zkhandle,const char *path);void print_usage(); void get_option(int argc,const char* argv[]);/**********unitl*********************/ void print_usage() {printf("Usage : [notify] [-h] [-c] [-p path][-s ip:port] \n");printf(" -h Show help\n");printf(" -p path\n");printf(" -c monitor the child nodes\n");printf(" -s zookeeper server ip:port\n");printf("For example:\n");printf("notify -s172.17.0.36:2181 -p /Notify\n"); }void get_option(int argc,const char* argv[]) {extern char *optarg;int optch;int dem = 1;const char optstring[] = "hcp:s:";while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 ){switch( optch ){case 'h':print_usage();exit(-1);case '?':print_usage();printf("unknown parameter: %c\n", optopt);exit(-1);case ':':print_usage();printf("need parameter: %c\n", optopt);exit(-1);case 'c':g_monitor_child = 1;break;case 's':strncpy(g_host,optarg,sizeof(g_host));break;case 'p':strncpy(g_path,optarg,sizeof(g_path));break;default:break;}} } void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) { /*printf("watcher event\n"); printf("type: %d\n", type); printf("state: %d\n", state); printf("path: %s\n", path); printf("watcherCtx: %s\n", (char *)watcherCtx); */if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 0){show_notify(zh,g_path);}else if(type == ZOO_CHILD_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);}else if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);} } void show_notify(zhandle_t *zkhandle,const char *path) {char notify_buffer[1024]={0};int notify_len = sizeof(notify_buffer);int ret = zoo_get(zkhandle,g_path,1,notify_buffer,?ify_len,NULL);if(ret != ZOK){fprintf(stderr,"failed to get the data of path %s!\n",g_path);}else{printf("Notice:%s\n",notify_buffer);} } void show_list(zhandle_t *zkhandle,const char *path) {struct String_vector children;int i = 0;int ret = zoo_get_children(zkhandle,path,1,&children);if(ret == ZOK){char child_path[512] ={0};char notify_buffer[1024] = {0};int notify_len = sizeof(notify_buffer);printf("--------------\n");for(i = 0; i < children.count; ++i){sprintf(child_path,"%s/%s",g_path,children.data[i]);ret = zoo_get(zkhandle,child_path,1,notify_buffer,?ify_len,NULL);if(ret != ZOK){fprintf(stderr,"failed to get the data of path %s!\n",child_path);}else{printf("%s:%s\n",children.data[i],notify_buffer);}}}else{fprintf(stderr,"failed to get the children of path %s!\n",path);}for(i = 0; i < children.count; ++i){free(children.data[i]);children.data[i] = NULL;} }int main(int argc, const char *argv[]) { int timeout = 30000; char path_buffer[512]; int bufferlen=sizeof(path_buffer); zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //設置日志級別,避免出現一些其它信息 get_option(argc,argv);zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Notify Test", 0); if (zkhandle ==NULL) { fprintf(stderr, "Error when connecting to zookeeper servers...\n"); exit(EXIT_FAILURE); } int ret = zoo_exists(zkhandle,g_path,0,NULL); if(ret != ZOK){ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"), &ZOO_OPEN_ACL_UNSAFE,0, path_buffer,bufferlen); if(ret != ZOK){fprintf(stderr,"failed to create the path %s!\n",g_path);}else{printf("create path %s successfully!\n",g_path);}}if(ret == ZOK && g_monitor_child == 0){show_notify(zkhandle,g_path);}else if(ret == ZOK && g_monitor_child == 1){show_list(zkhandle,g_path);}getchar();zookeeper_close(zkhandle); return 0; }
程序由3個參數選項
- -s:指定Zookeeper的server的ip:port
- -p:指定要監控的路徑,默覺得/Notify
- -c:使用此項表示監控子節點列表
如 notify -s172.17.0.36:2181 -p /Notify
當你在client改動數據的時候。程序就能收到相應的通知了。
轉載于:https://www.cnblogs.com/jzdwajue/p/6772590.html
總結
以上是生活随笔為你收集整理的Zookeeper场景实践:(5)分布式通知/协调的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第三十一讲:UML类图(上)
- 下一篇: PyQt4 开发入门