消息队列NetMQ 原理分析2-IO线程和完成端口
目錄
- 前言
- 介紹
- 目的
- IO線程
- 初始化IO線程
- Proactor
- 啟動Procator線程輪詢
- 處理socket
- IOObject
- 總結
前言
介紹
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。 當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.0-rc5。本文檔是對4.0.0-rc5分支代碼進行分析。zeromq的英文文檔
NetMQ的英文文檔
目的
對NetMQ的源碼進行學習并分析理解,因此寫下該系列文章,本系列文章暫定編寫計劃如下:
友情提示: 看本系列文章時最好獲取源碼,更有助于理解。
IO線程
NetMQ 4.0.0底層使用的是IOCP(即完成端口)模式進行通信的(3.3.4使用的是select模型),通過異步IO綁定到完成端口,來最大限度的提高性能。這里不對同步/異步socket進行詳細介紹。稍微解釋下完成端口,為了解決每個socket客戶端使用一個線程進行通信的性能問題,完成端口它充分利用內核對象的調度,只使用少量的幾個線程來處理和客戶端的所有通信,消除了無謂的線程上下文切換,最大限度的提高了網絡通信的性能。
想詳細了解完成端口的請看完成端口(Completion Port)詳解,講解的比較詳細,同時對各種網絡編程模型做了簡單的介紹。
因此NetMQ通過幾個(默認1個)IO線程處理通信,上一片文章介紹了ZObejct對象,在該對象中存在許多命令的處理,實際對命令的發送,分配都是IO線程的工作。
初始化IO線程
IO線程初始化時會初始化Proactor和IOThreadMailbox
var name = "iothread-" + threadId; m_proactor = new Proactor(name); m_mailbox = new IOThreadMailbox(name, m_proactor, this);Proactor對象就是用來綁定或處理完成端口用的,后面再做作詳細介紹。
IOThreadMailbox是IO線程處理的信箱,每當有命令需要處理時,都會向當前Socket對象所在的IO線程信箱發送命令。
讓我們看一眼IOThread對象和IOThreadMailbox的定義
IOThread對象繼承自ZObject對象,記得上一節想到ZObject對象知道如何處理各種命令嗎?因此IOThread對象也繼承了他父親的技能。同時IOThread對象實現了IMailboxEvent接口,這個接口之定義了一個方法。
internal interface IMailboxEvent {void Ready(); }當IO信箱接受到命令時表示當前有命令準備好了,可以進行 處理,IO信箱則會調用IO線程的Ready方法處理命令,那么IO信息如何調用IO線程的Ready方法呢,來看下IOThreadMailbox的構造函數。
internal class IOThreadMailbox : IMailbox {...public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent){m_proactor = proactor;m_mailboxEvent = mailboxEvent;Command cmd;bool ok = m_commandPipe.TryRead(out cmd);}... }在IOThreadMailbox初始化時,傳入了IMailboxEvent。
m_commandPipe是NetMQ的管道(Pipe),后面我們會對其做介紹,這里只要知道該管道用于存放命令即可,可以__暫時__理解為管道隊列。
Proactor
每個IOThread會有一個Proactor,Proactor的工作就是將Socket對象綁定到完成端口,然后定時去掃描完成端口是否有需要處理的Socket對象。
internal class Proactor : PollerBase {...public Proactor([NotNull] string name){m_name = name;m_stopping = false;m_stopped = false;m_completionPort = CompletionPort.Create();m_sockets = new Dictionary<AsyncSocket, Item>();}... }Proactor對象繼承自PollerBase,那么PollerBase又是什么呢?從命名可以看這是一個輪詢基類,即該對象需要長時間不斷循環處理某件事情。
PollerBase對象是一個抽象類,它有2個功能:
負載均衡
還記的Context中選擇IO線程時有這個一段代碼嗎?
IO線程的負載均衡功能就是PollBase對象提供的
protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }
public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } }
在IOThread取PollBase對象(Proactor)的Load屬性時候會特殊處理,保證拿到的是最新的值。
定時任務
PollBase第二個功能就是支持定時任務,即定時觸發某事件。
PollBase內部有一個SortedList,key為任務執行的時間,value為TimeInfo。
TimeInfo對象包含2個信息,id和ITimerEvent接口,id用來辨別當前任務的類型,ITimerEvent接口就包含了TimerEvent方法,即如何執行。
如TcpConnection連接失敗會重新連接時會重連,下面時TcpConnection開始連接方法
IO線程會被封裝到IOObject中,調用IOObject的AddTimer方法實際就是調用IO線程中Proactor對象的AddTimer方法,其方法定義如下
public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id) {long expiration = Clock.NowMs() + timeout;var info = new TimerInfo(sink, id);if (!m_timers.ContainsKey(expiration))m_timers.Add(expiration, new List<TimerInfo>());m_timers[expiration].Add(info); }第一行會獲取當前的毫秒時間加上時間間隔。然后加入到m_timers中。
初始化時會創建完成端口,當有socket需要處理時會和完成端口綁定。
初始化時還會初始化一個存放異步AsyncSocket和item的字典。
有關于AsyncSocket和CompletionPort可以去Git上看AsyncIO的源碼,這里不做分析。
Item結構如下
它包含了IProactorEvents接口的信息和當前Socket操作是否被取消標志。
internal interface IProactorEvents : ITimerEvent {void InCompleted(SocketError socketError, int bytesTransferred);void OutCompleted(SocketError socketError, int bytesTransferred); }IProactorEvents繼承自ITimerEvent。同時它還聲明了InCompleted和OutCompleted方法,即發送或接收完成時如何處理,因此當需要處理Socket時,會將當前Socket處理方式保存到這個字典中。當當前對象發送消息完成,則會調用OutCompleted方法,接收完成時則會調用InCompleted方法。
當有Socket需要綁定時會調用Proactor的AddSocket方法
它包含2個參數,一個時異步Socket對象和IProactorEvents。然后加把他們加入到字段中并將他們綁定到完成端口上。第四段AdjustLoad方法即把當前IO線程處理數量+1,用于負載均衡用。
當Socket操作完成時會調用Proactor的RemoveSocket移除綁定
public void RemoveSocket(AsyncSocket socket) {AdjustLoad(-1);var item = m_sockets[socket];m_sockets.Remove(socket);item.Cancelled = true; }移除時會將item的Cancelled字段設置為true。所以當Proactor輪詢處理Socket時發現該Socket操作被取消(移除),就會跳過處理。
啟動Procator線程輪詢
在IO線程啟動時實際就是啟動Procator的work線程
public void Start() {m_proactor.Start(); } public void Start() {m_worker = new Thread(Loop) { IsBackground = true, Name = m_name };m_worker.Start(); }處理socket
完整的Loop方法如下
private void Loop() {var completionStatuses = new CompletionStatus[CompletionStatusArraySize];while (!m_stopping){// Execute any due timers.int timeout = ExecuteTimers();int removed;if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;for (int i = 0; i < removed; i++){try{if (completionStatuses[i].OperationType == OperationType.Signal){var mailbox = (IOThreadMailbox)completionStatuses[i].State;mailbox.RaiseEvent();}// if the state is null we just ignore the completion statuselse if (completionStatuses[i].State != null){var item = (Item)completionStatuses[i].State;if (!item.Cancelled){switch (completionStatuses[i].OperationType){case OperationType.Accept:case OperationType.Receive:item.ProactorEvents.InCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;case OperationType.Connect:case OperationType.Disconnect:case OperationType.Send:item.ProactorEvents.OutCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;default:throw new ArgumentOutOfRangeException();}}}}catch (TerminatingException){ }}} } var completionStatuses = new CompletionStatus[CompletionStatusArraySize];第一行初始化了CompletionStatus數組,CompletionStatusArraySize值為100。
CompletionStatus作用是用來保存socket的信息或狀態。
獲取超時時間
int timeout = ExecuteTimers(); protected int ExecuteTimers() {if (m_timers.Count == 0)return 0;long current = Clock.NowMs();var keys = m_timers.Keys;for (int i = 0; i < keys.Count; i++){var key = keys[i];if (key > current){return (int)(key - current);}var timers = m_timers[key];foreach (var timer in timers){timer.Sink.TimerEvent(timer.Id);}timers.Clear();m_timers.Remove(key);i--;}return 0; }ExecuteTimers會計算之前加入到m_timers需要等待的超時時間,若沒有對象則直接返回0,否則獲取若獲取到key時間在當前時間之前,則需要調用TimerEvent方法,調用完成后移除。
若獲取到的key時間比當前時間大,則返回他們的差即為需要等待的超時時間。
從完成端口獲取處理完的狀態
int removed; if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;GetMultipleQueuedCompletionStatus方法傳入一個超時時間,若前面獲取的超時時間為0,則這邊會設置為-1,表示阻斷直到有要處理的才返回。
CompletionPort內部維護了一個狀態隊列,removed即為處理完成返回的狀態個數。
若獲取成功則會返回true,后面就開始遍歷completionStatuses數組處理完成Socket。
開始處理待處理的狀態
public struct CompletionStatus {internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : this(){AsyncSocket = asyncSocket;State = state;OperationType = operationType;SocketError = socketError;BytesTransferred = bytesTransferred;}public AsyncSocket AsyncSocket { get; private set; }public object State { get; internal set; }public OperationType OperationType { get; internal set; }public SocketError SocketError { get; internal set; }public int BytesTransferred { get; internal set; } }CompletionStatus是個結構體,它包含的信息如上。其中OperationType是當前Socket的處理方式。
public enum OperationType {Send, Receive, Accept, Connect, Disconnect, Signal }在for循環的一開始先會判斷當前狀態的OperationType,若是Signal,則說明當前是個信號狀態,說明有命令需要處理,則會調用IO信箱的RaiseEvent方法,實際為IO線程的Ready方法。
public void Ready() {Command command;while (m_mailbox.TryRecv(out command))command.Destination.ProcessCommand(command); }IOThread會將當前信箱的所有命令進行處理。
若不是Signal則會將CompletionStatus保存的狀態信息轉換為Item對象,并判斷當前Socket是否移除(取消)。若沒有則對其進行處理。判斷OperationType,若為Accept或Receive則表示需要接收,則調用InCompleted方法。若為Connect,Disconnect或Send則表示有消息向外發送,則調用OutCompleted方法。
至此IOThread代碼分析完畢。
IOObject
internal class IOObject : IProactorEvents {public IOObject([CanBeNull] IOThread ioThread){if (ioThread != null)Plug(ioThread);}public void Plug([NotNull] IOThread ioThread){Debug.Assert(ioThread != null);m_ioThread = ioThread;} }IOObject實際就是保存了IOThread的信息和Socket處理完成時如何執行,以及向外暴露了一些接口。
再次說明,如果向簡單了解完成端口如何使用,則看《完成端口使用》,如果想詳細了解完成端口則看下《完成端口詳細介紹》,如果想直到NetMQ的AsyncIO和完成端口的源碼請看AsyncIO。
總結
該篇介紹了IO線程和完成端口的處理方式,若哪里分析的不到位或有誤希望支出。
本文地址:https://www.cnblogs.com/Jack-Blog/p/6347163.html
作者博客:杰哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接)
轉載于:https://www.cnblogs.com/Jack-Blog/p/6347163.html
總結
以上是生活随笔為你收集整理的消息队列NetMQ 原理分析2-IO线程和完成端口的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 做梦梦到槐花预示着什么
- 下一篇: 统计单词个数(划分型)