分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载
一、分布式消息總線以及基于Socket的實現
???? 在前面的分享一個分布式消息總線,基于.NET Socket Tcp的發布-訂閱框架,附代碼下載一文之中給大家分享和介紹了一個極其簡單也非常容易上的基于.NET Socket Tcp 技術實現的分布消息總線,也是一個簡單的發布訂閱框架:
??? 并且以案例的形式為大家演示了如何使用這個分布式消息總線架構發布訂閱架構模式的應用程序,在得到各位同仁的反饋的同時,大家也非常想了解訂閱者離線的情況,即支持離線構發布訂閱框架。
二、離線架構
???? 不同于訂閱者、發布者都同時在線的情況,支持訂閱者離線,架構將有所變化,如下圖所示:
???? 也會比原先的結構將更加復雜,其中需要處理以下兩個關鍵點:
???? 1)訂閱者的持久化存儲。
???? 2)訂閱者離線之后其所訂閱消息的持久存儲。
三、解決方案
???? 為解決消息總線的離線支持機制,我們在Socket 框架之中增加了一個接口ISubscribeStorager:
1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5:? 6: namespace EAS.Messages 7: { 8: /// <summary> 9: /// 消息訂閱存儲接口。 10: /// </summary> 11: public interface ISubscribeStorager 12: { 13: /// <summary> 14: /// 持久化訂閱。 15: /// </summary> 16: /// <param name="subscriber">訂閱者。</param> 17: /// <param name="topic">消息主題。</param> 18: void Subscribe(string subscriber, string topic); 19:? 20: /// <summary> 21: /// 持久化退訂。 22: /// </summary> 23: /// <param name="subscriber">訂閱者。</param> 24: /// <param name="topic">消息主題。</param> 25: void Unsubscribe(string subscriber, string topic); 26:? 27: /// <summary> 28: /// 裝載訂閱信息。 29: /// </summary> 30: /// <returns>系統之中的訂閱清單。</returns> 31: List<SubscribeItem> LoadSubscribes(); 32:? 33: /// <summary> 34: /// 寫入消息。 35: /// </summary> 36: /// <param name="subscriber">訂閱者。</param> 37: /// <param name="message">消息對象。</param> 38: void Write(string subscriber, QueueMessage message); 39:? 40: /// <summary> 41: /// 讀消息。 42: /// </summary> 43: /// <param name="subscriber">訂閱者。</param> 44: /// <param name="message">消息對象。</param> 45: /// <returns>成功讀取返回true,否則返回false。</returns> 46: bool Read(string subscriber, out QueueMessage message); 47: } 48: }???? ISubscribeStorager共提供持久化訂閱持久化消息存儲共五個函數,其中:
???? LoadSubscribes:服務端初始化時讀取所有的離線訂閱關系,即那個訂閱都訂閱那那個主題。
???? Subscribe:持久化訂閱者,當訂閱才上線訂閱消息時,持久化訂閱關系,供離線檢測之用。
???? Unsubscribe:持久化取消訂閱,當訂閱者退訂消息時,從持久化訂閱關系之中刪除。
???? Write:當訂閱者離線時,把訂閱消息寫入持久化存儲。
???? Read:當離線訂閱者上線時,從持久存儲之中讀取一條消息向其發送。
???? ISubscribeStorager:可以選擇自己實現這個接口,以建立滿足自己規則的離線存儲機制,當然在AgileEAS.NET SOA 中間件之中提供了兩種離線存儲機制,存儲于數據庫和存儲于MSMQ,下面向大家介紹一下這兩種內置實現。
四、兩種內置離線存儲機制
???? 在AgileEAS.NET SOA 中間件平臺之中提供了兩個ISubscribeStorager的實現,基于數據庫的離線訂閱存儲實現EAS.Messages.DbSubscribeStorager和基于MSMQ的離線訂閱存儲實現EAS.Messages.MsmqSubscribeStorager。
???? EAS.Messages.DbSubscribeStorager:存儲訂閱關系在messageSubscribe.Config文件之中,消息存儲在關系數據庫SOA_SUBSCRIBEEVENTS表之中,使用前必須要建立相應的表結構,以下是SQL Server的DDL腳本:
1: CREATE TABLE [SOA_SUBSCRIBEEVENT]( 2: [GUID] [varchar](36) NOT NULL, 3: [SUBSCRIBER] [nvarchar](128) NOT NULL, 4: [TOPIC] [nvarchar](128) NOT NULL, 5: [BODY] [image] NULL, 6: [FCTIME] [datetime] NOT NULL, 7: CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED 8: ( 9: [GUID] ASC 10: ) 11: )????? 目前理論上支持SQLServer 、Mysql、ORACLE、Sqlite四種數據庫結構,具體建表腳本請自行參考相應資料書寫,也可以使用AgileEAS.NET SOA中間件所提供的數據庫初始化工具創建。
????? EAS.Messages.MsmqSubscribeStorager:存儲訂閱關系在messageSubscribe.Config文件之中,消息存儲Msmq消息隊列之中,使用之前請確保機器上安裝了MSMQ消息對列。
五、關于自定義實現ISubscribeStorager
???? 有興趣的朋友可以自定義實現接口ISubscribeStorager,這樣就可以按自己的規則進行存儲,比如把離線消息存儲到mongodb、Redis、或者直接存儲在文件之中,或者其他更多的實現規則,在此就不一一介紹,如有相關興趣,請聯系作者,如確有必要需要給在家介紹一下如何實現,將會另開一文本介紹如何自定義實現ISubscribeStorager接口。
六、改進在線例子支持離線
???? 還是跟上次一樣,以案例為在家展示一下怎么進行離線消息,就不重新開始例子,對原有例子做一些改進,改進后例子如下:
???? 其中在原有項目的基礎上增加了:Demo.Subscriber1和Demo.Subscriber2項目,其項目配置代碼、配置文件基本上同Demo.Subscriber一樣,其中唯一的差別在于,Demo.Subscriber1和Demo.Subscriber2向服務器提交訂閱的時候都增加一個另friendName參數,其使用IMessageBus接口的以下訂閱函數:
1: /// <summary> 2: /// 訂閱消息。 3: /// </summary> 4: /// <param name="subscriber">訂閱者。</param> 5: /// <param name="friendName">訂閱者名稱,用于處理離線訂閱。</param> 6: /// <param name="topic">主題。</param> 7: /// <param name="notifyHandler">訂閱通知。</param> 8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);??????????????? Demo.Publisher項目為發布者代碼。
??????????????? Demo.Subscriber項目為訂閱者代碼。
??????????????? Demo.Server項目為服務端代碼。
???? Demo.Subscriber1項目之中,其Program.cs代碼如下:
1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Windows.Forms; 5: using EAS.Messages; 6:? 7: namespace Demo.Subscriber1 8: { 9: class Program 10: { 11: static void Main(string[] args) 12: { 13: var container = EAS.Objects.ContainerBuilder.BuilderDefault(); 14: var bus = container.GetComponentInstance("MessageBus") as IMessageBus; 15: System.Console.WriteLine("Subscriber1"); 16:? 17: bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify); 18: System.Console.ReadLine(); 19: } 20:? 21: static void MessageNotify(object m) 22: { 23: Demo.Messages.Message message = m as Demo.Messages.Message; 24: System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID)); 25: } 26: } 27: }???? 其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在訂閱消息的時候給了一個friendName為Subscriber1,Demo.Subscriber2與Demo.Subscriber1項目的唯一的差別就是此處為Subscriber2.
???? 我們使用內置的EAS.Messages.DbSubscribeStorager,則不需要修改服務端的代碼,只需要修改服務端的配置文件如下:
1: <?xml version="1.0" encoding="utf-8"?> 2: <configuration> 3: <configSections> 4: <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" /> 5: </configSections> 6: <startup useLegacyV2RuntimeActivationPolicy="true"> 7: <supportedRuntime version="v4.0"/> 8: </startup> 9: <eas> 10: <objects> 11: <!--數據庫連接--> 12: <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread"> 13: <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" /> 14: </object> 15: <!--數據訪問器--> 16: <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread"> 17: <property name="DbProvider" type="object" value="DbProvider"/> 18: <property name="Language" type="object" value="TSqlLanguage"/> 19: </object> 20: <!--ORM訪問器--> 21: <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread"> 22: <property name="DataAccessor" type="object" value="DataAccessor"/> 23: </object> 24: <!--查詢語言--> 25: <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/> 26: <!--消息持久化--> 27: <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/> 28: <!--日志管理--> 29: <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton"> 30: <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" /> 31: </object> 32: </objects> 33: </eas> 34: </configuration>???? 在配置文件的IOC配置之中我們配置了消息存儲對象以及其所依賴的數據庫訪問對象、Linq查詢語言表達式,另外需要說明的是,我們需要把配置文件之中所涉及的EAS.Data.dll、EAS.SOA.BootStrap.dll復制到編譯輸出Publish,這兩個文件可以從AgileEAS.NET SOA 中間件平臺發布包之中尋找,本案例的下載壓碎包之中會包括這兩個文件。
???? 有關于基于Msmq的配置,只需要修改配置文件如下:
1: <?xml version="1.0" encoding="utf-8"?> 2: <configuration> 3: <configSections> 4: <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" /> 5: </configSections> 6: <startup useLegacyV2RuntimeActivationPolicy="true"> 7: <supportedRuntime version="v4.0"/> 8: </startup> 9: <eas> 10: <objects> 11: <!--消息持久化--> 12: <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/> 13: <!--日志管理--> 14: <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton"> 15: <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" /> 16: </object> 17: </objects> 18: </eas> 19: </configuration>???? 到此為止,所有代碼均已完成,是不是很簡單,接下來,我們跑起來驗證一下效果。
七、驗證效果
???? 我們在編譯輸入目錄Publish下先啟動Demo.Server.exe,再各啟動Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再啟動一個Demo.Publisher.exe,在Demo.Publisher.exe控制臺按回車鍵:
目前程序三個訂閱者都是在線的,Demo.Publisher發布了三條消息,三個訂閱者都收到了三條消息,那么我們關閉Demo.Subscriber2之后再由Demo.Publisher發布兩條消息:
然后我們再啟動Demo.Subscriber2,看是否還能收到其離線之后由Demo.Publisher發布的兩條消息:
OK,到此為止。
八、源代碼下載
???? 本程序的源代碼已上傳到服務器,請通過http://112.74.65.50/downloads/eas/Demo.Pub_Sub_Offline.rar進行下載,如果在開發過程之中想要了解更多有關Socket通信框架以及更多AgileEAS.NET SOA中間件平臺的技術資源,請通過AgileEAS.NET SOA 網站:http://www.smarteas.net的最新下載欄目進行下載。???
九、問題反饋
???? 麻煩大家在通過視頻進行學習的時候能及時把問題反饋給樓主,或者有什么需要改進的一些建議都請向樓主直接反饋,以下是聯系方式:
團隊網站:http://www.agilelab.cn
AgileEAS.NET網站:http://www.agileeas.net
官方博客:http://eastjade.cnblogs.com
github:https://github.com/agilelab/eas
樓主QQ:47920381
QQ群:113723486(AgileEAS SOA 平臺)/上限1000人
199463175(AgileEAS SOA 交流)/上限1000人
120661978(AgileEAS.NET 平臺交流)/上限1000人
郵件:james@agilelab.cn,mail.james@qq.com,
電話:18629261335。
轉載于:https://www.cnblogs.com/eastjade/p/3926651.html
總結
以上是生活随笔為你收集整理的分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓学习-界面-布局-FrameLayo
- 下一篇: C. Present(二分 + 扫描线)