nng 概述
一、前言
1.1 基本介紹
NNG/nanomsg 是最近項目上使用到的一個通信庫,用來實現進程間過程調用和線程間通信,很是方便。
NNG 是 nanomsg 的繼任版本,而 nanomsg 則是流行的 ZMQ (一個簡單好用的傳輸層,像框架一樣的一個 socket library)的 C 重寫版。
NNG 將通信使用的協議和傳輸分離,同一個協議可以工作在不同的傳輸層上,類似與 TCP/IP 的應用層和傳輸層的分層,同時接口上屏蔽了底層細節,統一用字符串 URL 來描述傳輸模式。這樣當使用場景修改時,可以通過簡單修改 URL 來實現適應,極具靈活性。
同時如 NNG 描述所言 “light-weight brokerless messaging”,NNG 中的通信各方是不需要第三方程序介入的,這與 MQTT/Redis 通信需要服務器不同。這樣很適合作為通信庫來使用而沒有其他依賴。
1.2 通訊協議
- PAIR 一對一雙向通信。
- PIPELINE(PUSH/PULL) 單向通信,類似與生產者消費者模型的消息隊列。
- PUB/SUB 單向廣播。
- REQ/REP 請求-應答模式,類似與 RPC 模式。
- BUS 網狀連接通信,每個加入節點都可以發送/接受廣播消息。
- SURVEY 用于多節點表決或者服務發現。
1.3 傳輸模式
- inproc 進程內線程間傳輸
- ipc 主機內進程間傳輸
- tcp 網絡內主機間傳輸
1.4 通訊模式
通信協議里除了 PAIR 之外,基本都是一對多的通信模式,這點需要注意,以 PIPELINE 和 PUB/SUB 為例:
基于以上,多個程序是沒辦法共用一個 PUB/SUB 通道來廣播數據的,這與 ROS 里的 topic 和 LCM 中的 channel 模式不同。如果要實現類似功能,則可以使用 PIPELINE + PUB/SUB 來處理:
- 獨立一個話題發布的程序,擁有一個 PULL 和 PUB。
- PULL 約定一個 URL,所有需要發布該話題的程序都 PUSH 數據到該 URL 上。
- PUB 約定一個 URL,所有需要獲取該話題的程序都 SUB 到該 URL 上。
- 程序內部循環將 PULL 讀取的數據發送到 PUB 上。
以上則可以模擬出 ROS topic 數據合并 或者 LCM 中 channel 的類似功能。
整體上看,NNG 的 API 很簡約,主要是 4 個,open/recv/send/close,open 根據協議不同使用的函數會不同。配置則是 setopt/getopt,與 UNIX API 類似。API 中沒有上下文環境(context-less)依賴,只需要一個 nng_socket,這種設計和實現方法值得去學習一下(初步揣測應該是使用指針值作為handle,如果要強制編譯器做類型檢測,則會套上一層 struct,如 typedef struct { _nng_xxx_socket * p } nng_socket;)。
NNG 協議基本上囊括了常見的通信需求,一些特殊的需求,也可以通過組合協議來實現,比如上面的模擬 ROS topic 或者 LCM channel 的方法。這樣一來,如果在程序中使用 NNG,不管是多進程,還是多線程,通過設計,可以進一步增強模塊化,同時不乏靈活性。如果環境變化,程序不管是由多進程改成多線程,還是由多線程改成多主機,都很容易實現。
常見模塊/進程/線程間通信,可以依據具體需求來使用 PIPELINE(消息隊列) 還是 REQ/REP(過程調用),而不是鎖+全局變量,每個模塊單元只需要做單一相關的具體事務,無需知曉全局狀態。
1.5 代碼結構
nng.h:
nng對外暴露的 api 接口
transport.h:
通信層定義,主要是為了暴露給用戶以實現擴展,但目前包含了utils下的相關頭文件,其中inproc.h/ipc.h/tcp.h是對應的transport
protocol.h:
協議層定義,也是為了暴露給用戶以實現擴展,其中reqrep.h/pubsub.h/bus.h/pair.h/pipeline.h/survey.h是對應的protocol
utils/:
實用工具包,包含基本數據結構(list/queue/hash)、互斥及原子操作(mutex/atomic)等
transports/:
通信層實現,包括(inproc:進程內通信;ipc:進程間通信;tcp:tcp通信)
protocols/:
協議層實現,包括(REQREP:請求響應;PUBSUB:訂閱發布等)
core/:
通用代碼
aio/:
線程池模擬的異步操作,帶狀態機的事件驅動等
二 結構介紹
2.1 nng_aio
一個異步 I/O 句柄。這個 aio 結構的細節是 AIO 框架私有的。該結構具有公共名稱 (nng_aio),以便我們最大限度地減少公共 API 命名空間中的污染。 AIO 框架之外的任何東西訪問這些成員中的任何一個都是一個編碼錯誤——這里提供定義是為了方便內聯,但這應該是唯一的用途。
2.2 nni_id_map
我們發現我們經常希望有一個由數字 ID 列出的事物列表,它通常是單調遞增的。這通常是管道 ID。為了幫助保持這些事物的集合由它們的 ID(可能從一個非常大的值開始)索引,我們提供了一個哈希表。哈希表使用開放尋址,但我們使用更好的探針(取自 Python)以避免命中相同的位置。我們的哈希算法只是低位,我們使用的表大小是 2 的冪。請注意,散列項必須為非 NULL。該表受內部鎖保護。
三、數據傳輸
3.1 發送數據
nng_sendmsg
nng_aio_set_timeout
nng_aio_set_msg
nng_send_aio
nni_aio_get_msg
nni_sock_find
nni_sock_send --> sock_send
nni_sock_rele
nng_aio_wait
nng_aio_result
3.2 接收數據
nng_recvmsg
nng_aio_set_timeout
nng_recv_aio
nni_sock_find
nni_sock_recv --> sock_recv
nni_sock_rele
nng_aio_wait
nng_aio_result
nng_aio_free
四、AIO
4.1 AIO 狀態
AIO 結構可以攜帶最多 4 個不同的輸入值,最多 4 個不同的輸出值,以及最多 4 個不同的“私有狀態”值。 輸入和輸出的含義由被調用的 I/O 函數決定。
typedef enum {
NNG_INIT_RECV = 0,
NNG_RECV_RET_SEND,
NNG_SEND_RET_RECV,
NNG_RECV_RET_RECV,
} nng_aio_state_t;
4.2 AIO 介紹
AIO 只能由調用者“完成”,調用者必須調用 nni_aio_finish 。在發生這種情況之前,調用者保證 AIO 有效。調用者必須保證一個 AIO 將“完成”(通過調用 nni_aio_finish )。
請注意,取消例程可能會被框架多次調用。框架(或消費者)保證 AIO 將在這些調用中保持有效,以便提供者可以自由地檢查 aio 的列表成員資格等。但是提供者不能多次調用完成。
nni_aio_lk 用于保護 AIO 上的標志以及 AIO 上的過期列表。 如果到期未完成,我們將不允許將 AIO 標記為已完成。
為了與過期同步,我們將 aio 記錄為過期,并在銷毀它之前等待該記錄被清除(或至少不等于 aio)。
aio 框架與 taskq 框架緊密結合。當調用者將 aio 標記為開始(使用 nni_aio_begin)時,我們為 aio“準備”任務,并將任務標記為忙碌。然后,當我們想知道操作本身是否完成時,我們所要做的就是等待任務完成(忙碌標志被清除)。
為了防止在拆卸期間 aio 重用,我們設置了 a_stop 標志。在該點之后為新操作初始化的任何嘗試都將失敗,并且調用者將獲得 NNG_ECANCELED 指示這一點。調用 nni_aio_begin() 的提供者必須檢查返回值,如果返回非零值 (NNG_ECANCELED),那么它必須簡單地丟棄請求并返回。
調用 nni_aio_wait 等待當前未完成的操作完成,但不會阻止另一個操作在同一個 aio 上啟動。要同步停止 aio 并防止在其上啟動任何進一步的操作,請調用 nni_aio_stop。為了防止操作開始,而無需等待任何現有操作完成,請調用 nni_aio_close。
在某些地方,我們想檢查 aio 是否未使用。從技術上講,如果這些檢查通過,那么它們就不需要用鎖來完成,因為調用者應該擁有對它們的唯一引用。然而,競爭檢測器不一定知道這個語義,并且可能會抱怨潛在的數據競爭。要抑制誤報,請定義 NNG_RACE_DETECTOR。注意這會導致獲取額外的鎖,影響性能,所以不要在生產中使用它。
總結
- 上一篇: 海康威视nas安全_确保NAS安全的6件
- 下一篇: 基于51单片机农业土壤湿度监测及自动灌溉