zmq 接口函数之 :zmq_socket_monitor - 注册一个监控回调函数
ZeroMQ 官方地址 :http://api.zeromq.org/4-2:zmq-socket-monitor
zmq_socket_monitor(3) ?MQ?Manual?-??MQ/4.1.0
Name
zmq_socket_monitor?-?注冊一個監控回調函數
Synopsis
int zmq_socket_monitor (void *socket, char * *addr, int events);Description
zmq_socket_monitor()?函數會產生一個PAIR類型的socket,用來把socket狀態改變(事件)通過inproc://傳輸方式廣播到制定的終結點(endpoint)上。
消息包括兩個幀,第一部分包含著事件ID和與其相關聯的值。第二幀以字符串方式保存收到影響的終結點。
第一幀的組織方式是:16?bit的事件ID和32?bit的事件值。
事件ID和19:23值是按照本地的字節序排序的(即這個運行著的進程所在的機器)。在這兩部分之間沒有其它的數據。
事件值必須根據根據當前的context和事件ID進行解析。更多細節請查看下面可支持的事件。
只有方向確定的傳輸方式(tcp、ipc)才能支持此初始行為。
支持的事件
ZMQ_EVENT_CONNECTED:鏈接已建立
當和遠程的另一端的連接建立好的時候,ZMQ_EVENT_CONNECTED事件會被觸發。同步和異步事件都會發生觸發此事件。事件值是新連接的socket的FD。
?
ZMQ_EVENT_CONNECT_DELAYED:同步連接失敗,仍在進行重試
當一個請求立即連接的嘗試被延遲并且仍然在嘗試的時候,此事件被觸發。事件值沒有意義。
?
ZMQ_EVENT_CONNECT_RETRIED:嘗試異步連接/重連
當一個連接嘗試被重連計時器捕獲后此事件被觸發。重連間隔根據所有的嘗試情況進行計算。事件值是重連間隔。
ZMQ_EVENT_LISTENING:socket已經綁定了某個地址,準備好接受連接請求
當一個socket成功的綁定在一個端口上的時候此事件被觸發。事件值是新綁定的socket的FD。
?
ZMQ_EVENT_BIND_FAILED:socket無法綁定在這個地址上
當一個socket無法綁定在給定的端口上時此事件被觸發。事件值是綁定函數修改后的errno值。
?
ZMQ_EVENT_ACCEPTED:連接請求被接受
一個從遠端到來的連接被一個綁定了地址的socket接受并建立了連接是會觸發此事件。事件值是被接受socket的FD。
?
ZMQ_EVENT_ACCEPT_FAILED:無法接受客戶端的連接請求
當一個連接請求試圖連接另一個socket失敗的時候會觸發此事件。事件值是accept設置的errno值。
?
ZMQ_EVENT_CLOSED:連接關閉
當一個連接的底層描述符被關閉是會觸發此事件。事件值是被關閉的socket的FD。此時這個FD已經被關閉了。
?
ZMQ_EVENT_CLOSE_FAILED:連接無法被關閉
當一個描述符無法被釋放回OS的時候會觸發此事件。注意:只對IPC?socket有效。事件值是釋放失敗時設置的errno值。
?
ZMQ_EVENT_DISCONNECTED:會話被破壞
當流引擎(尤其是TCP、IPC)出現了崩潰的/被破壞的會話時,此事件被觸發。事件值是socket的FD。
Return?value
當函數zmq_socket_monitor()?執行成功時,返回0或者更大值。否則返回?-1,并且設置errno為下列指定值。
Errors
ETERM
與被指定的socket關聯的ZMQ?context?被終結了。
EPROTONOSUPPORT
無法支持被請求的傳輸協議。監視socket必須要使用inproc://方式進行傳輸。
EINVAL
提供的地址節點不能用。
Example
監視一個REP?socket的連接狀態
#include <stdio.h> #include <zmq.h> #include <pthread.h> #include <string.h> #include <assert.h>static int read_msg(void* s, zmq_event_t* event, char* ep) {int rc ;zmq_msg_t msg1; // binary partzmq_msg_init (&msg1);zmq_msg_t msg2; // address partzmq_msg_init (&msg2);rc = zmq_msg_recv (&msg1, s, 0);if (rc == -1 && zmq_errno() == ETERM)return 1 ;assert (rc != -1);assert (zmq_msg_more(&msg1) != 0);rc = zmq_msg_recv (&msg2, s, 0);if (rc == -1 && zmq_errno() == ETERM)return 1;assert (rc != -1);assert (zmq_msg_more(&msg2) == 0);// copy binary data to event structconst char* data = (char*)zmq_msg_data(&msg1);memcpy(&(event->event), data, sizeof(event->event));memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value));// copy address partconst size_t len = zmq_msg_size(&msg2) ;ep = memcpy(ep, zmq_msg_data(&msg2), len);*(ep + len) = 0 ;return 0 ; }// REP socket monitor thread static void *rep_socket_monitor (void *ctx) {zmq_event_t event;static char addr[1025] ;int rc;printf("starting monitor...\n");void *s = zmq_socket (ctx, ZMQ_PAIR);assert (s);rc = zmq_connect (s, "inproc://monitor.rep");assert (rc == 0);while (!read_msg(s, &event, addr)) {switch (event.event) {case ZMQ_EVENT_LISTENING:printf ("listening socket descriptor %d\n", event.value);printf ("listening socket address %s\n", addr);break;case ZMQ_EVENT_ACCEPTED:printf ("accepted socket descriptor %d\n", event.value);printf ("accepted socket address %s\n", addr);break;case ZMQ_EVENT_CLOSE_FAILED:printf ("socket close failure error code %d\n", event.value);printf ("socket address %s\n", addr);break;case ZMQ_EVENT_CLOSED:printf ("closed socket descriptor %d\n", event.value);printf ("closed socket address %s\n", addr);break;case ZMQ_EVENT_DISCONNECTED:printf ("disconnected socket descriptor %d\n", event.value);printf ("disconnected socket address %s\n", addr);break;}}zmq_close (s);return NULL; }int main() {const char* addr = "tcp://127.0.0.1:6666" ;pthread_t thread ;// Create the infrastructurevoid *ctx = zmq_init (1);assert (ctx);// REP socketvoid* rep = zmq_socket (ctx, ZMQ_REP);assert (rep);// REP socket monitor, all eventsint rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);assert (rc == 0);rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx);assert (rc == 0);rc = zmq_bind (rep, addr);assert (rc == 0);// Allow some time for event detectionzmq_sleep (1);// Close the REP socketrc = zmq_close (rep);assert (rc == 0);zmq_term (ctx);return 0 ; }?
總結
以上是生活随笔為你收集整理的zmq 接口函数之 :zmq_socket_monitor - 注册一个监控回调函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python gevent高并发(限制最
- 下一篇: golang 数组、指针数组、数组指针使