基于消息与.Net Remoting的分布式处理架构
分布式處理在大型企業應用系統中,最大的優勢是將負載分布。通過多臺服務器處理多個任務,以優化整個系統的處理能力和運行效率。分布式處理的技術核心是完 成服務與服務之間、服務端與客戶端之間的通信。在.Net 1.1中,可以利用Web Service或者.Net Remoting來實現服務進程之間的通信。本文將介紹一種基于消息的分布式處理架構,利用了.Net Remoting技術,并參考了CORBA Naming Service的處理方式,且定義了一套消息體制,來實現分布式處理。?
?一、消息的定義
?????? 要實現進程間的通信,則通信內容的載體——消息,就必須在服務兩端具有統一的消息標準定義。從通信的角度來看,消息可以分為兩類:Request Messge和Reply Message。為簡便起見,這兩類消息可以采用同樣的結構。
?????? 消息的主體包括ID,Name和Body,我們可以定義如下的接口方法,來獲得消息主體的相關屬性:
C#語言:? public interface IMessage:ICloneable{
???? IMessageItemSequence GetMessageBody();
???? string GetMessageID();
???? string GetMessageName();
???? void SetMessageBody(IMessageItemSequence aMessageBody);
???? void SetMessageID(string aID);
???? void SetMessageName(string aName);
}
??? 消息主體類Message實現了IMessage接口。在該類中,消息體Body為IMessageItemSequence類型。這個類型用于Get和Set消息的內容:Value和Item:
C#語言:? public interface IMessageItemSequence:ICloneable{??????
???? IMessageItem GetItem(string aName);
???? void SetItem(string aName,IMessageItem aMessageItem);???????
???? string GetValue(string aName);??
???? void SetValue(string aName,string aValue);
}
?????? Value為string類型,并利用HashTable來存儲Key和Value的鍵值對。而Item則為IMessageItem類型,同樣的在 IMessageItemSequence的實現類中,利用HashTable存儲了Key和Item的鍵值對。
?????? IMessageItem支持了消息體的嵌套。它包含了兩部分:SubValue和SubItem。實現的方式和IMessageItemSequence相似。定義這樣的嵌套結構,使得消息的擴展成為可能。一般的結構如下:
?????? IMessage——Name
???????????????????? ——ID
???????????????????? ——Body(IMessageItemSequence)
??????????????????????????? ——Value
??????????????????????????? ——Item(IMessageItem)
?????????????????????????????????? ——SubValue
?????????????????????????????????? ——SubItem(IMessageItem)
????????????????????????????????????????? ——……
?????? 各個消息對象之間的關系如下:
?
?????? 在實現服務進程通信之前,我們必須定義好各個服務或各個業務的消息格式。通過消息體的方法在服務的一端設置消息的值,然后發送,并在服務的另一端獲得這些值。例如發送消息端定義如下的消息體:
IMessageItemSequence body = factory.CreateMessageItemSequence();
body.SetValue("name1","value1");
body.SetValue("name2","value2");
IMessageItem item = factory.CreateMessageItem();
item.SetSubValue("subname1","subvalue1");
item.SetSubValue("subname2","subvalue2");
IMessageItem subItem1 = factory.CreateMessageItem();
subItem1.SetSubValue("subsubname11","subsubvalue11");
subItem1.SetSubValue("subsubname12","subsubvalue12");
IMessageItem subItem2 = factory.CreateMessageItem();
subItem1.SetSubValue("subsubname21","subsubvalue21");
subItem1.SetSubValue("subsubname22","subsubvalue22");
item.SetSubItem("subitem1",subItem1);
item.SetSubItem("subitem2",subItem2);
body.SetItem("item",item);
//Send Request Message
MyServiceClient service = new MyServiceClient("Client");
IMessageItemSequence reply = service.SendRequest("TestService","Test1",body);
?????? 在接收消息端就可以通過獲得body的消息體內容,進行相關業務的處理。?
?二、.Net Remoting服務
?????? 在.Net中要實現進程間的通信,主要是應用Remoting技術。根據前面對消息的定義可知,實際上服務的實現,可以認為是對消息的處理。因此,我們可以對服務進行抽象,定義接口IService:
C#語言:? public interface IService{
???? IMessage Execute(IMessage aMessage);
}
??????? Execute()方法接受一條Request Message,對其進行處理后,返回一條Reply Message。在整個分布式處理架構中,可以認為所有的服務均實現該接口。但受到Remoting技術的限制,如果要實現服務,則該服務類必須繼承自 MarshalByRefObject,同時必須在服務端被Marshal。隨著服務類的增多,必然要在服務兩端都要對這些服務的信息進行管理,這加大了 系統實現的難度與管理的開銷。如果我們從另外一個角度來分析服務的性質,基于消息處理而言,所有服務均是對Request Message的處理。我們完全可以定義一個Request服務負責此消息的處理。
?????? 然而,Request服務處理消息的方式雖然一致,但畢竟服務實現的業務,即對消息處理的具體實現,卻是不相同的。對我們要實現的服務,可以分為兩大類: 業務服務與Request服務。實現的過程為:首先,具體的業務服務向Request服務發出Request請求,Request服務偵聽到該請求,然后 交由其偵聽的服務來具體處理。
?????? 業務服務均具有發出Request請求的能力,且這些服務均被Request服務所偵聽,因此我們可以為業務服務抽象出接口IListenService:
C#語言: public interface IListenService{
???? IMessage OnRequest(IMessage aMessage);?
}
??????? Request服務實現了IService接口,并包含IListenService類型對象的委派,以執行OnRequest()方法:
C#語言:? public class RequestListener:MarshalByRefObject,IService{
???? public RequestListener(IListenService listenService)
???? {
???????? m_ListenService = listenService;
???? }
???? private IListenService m_ListenService;
???? #region IService Members
???? public IMessage Execute(IMessage aMessage)
???? {
???????? return this.m_ListenService.OnRequest(aMessage);
???? }??????
???? #endregion
???? public override object InitializeLifetimeService()
???? {
???????? return null;
???? }
}
?????? 在RequestListener服務中,繼承了MarshalByRefObject類,同時實現了IService接口。通過該類的構造函數,接收IListService對象。
?????? 由于Request消息均由Request服務即RequestListener處理,因此,業務服務的類均應包含一個RequestListener的 委派,唯一的區別是其服務名不相同。業務服務類實現IListenService接口,但不需要繼承MarshalByRefObject,因為被 Marshal的是該業務服務內部的RequestListener對象,而非業務服務本身:
C#語言:? public abstract class Service:IListenService{
???? public Service(string serviceName)
???? {
???????? m_ServiceName = serviceName;?
???????? m_RequestListener = new RequestListener(this);
???? }??????
???? #region IListenService Members
???? public IMessage OnRequest(IMessage aMessage)
???? {
???????? //……
???? }?
???? #endregion
???? private string m_ServiceName;
???? private RequestListener m_RequestListener;????
}
?????? Service類是一個抽象類,所有的業務服務均繼承自該類。最后的服務架構如下:
?
?????? 我們還需要在Service類中定義發送Request消息的行為,通過它,才能使業務服務被RequestListener所偵聽。?
C#語言:? public IMessageItemSequence SendRequest(string aServiceName,string??????????????????????????????????????? aMessageName,IMessageItemSequence aMessageBody){
???? IMessage message = m_Factory.CreateMessage();
???? message.SetMessageName(aMessageName);
???? message.SetMessageID("");
???? message.SetMessageBody(aMessageBody);
???? IService service = FindService(aServiceName);
???? IMessageItemSequence replyBody = m_Factory.CreateMessageItemSequence();
???? if (service != null)
???? {
????????? IMessage replyMessage = service.Execute(message);
????????? replyBody = replyMessage.GetMessageBody();?????????
???? }
???? else
???? {?????????
????????? replyBody.SetValue("result","Failure");?????????
???? }
???? return replyBody;
}
?????? 注意SendRequest()方法的定義,其參數包括服務名,消息名和被發送的消息主體。而在實現中最關鍵的一點是FindService()方法。我 們要查找的服務正是與之對應的RequestListener服務。不過,在此之前,我們還需要先將服務Marshal:
C#語言:? public void Initialize(){?????????????????????????????????????????
??? RemotingServices.Marshal(this.m_RequestListener,this.m_ServiceName +? ".RequestListener");
}
?????? 我們Marshal的對象,是業務服務中的Request服務對象m_RequestListener,這個對象在Service的構造函數中被實例化:
C#語言: m_RequestListener = new RequestListener(this);?????? 注意,在實例化的時候是將this作為IListenService對象傳遞給RequestListener。因此,此時被Marshal的服務對象, 保留了業務服務本身即Service的指引。可以看出,在Service和RequestListener之間,采用了“雙重委派”的機制。
?????? 通過調用Initialize()方法,初始化了一個服務對象,其類型為RequestListener(或IService),其服務名 為:Service的服務名 + ".RequestListener"。而該服務正是我們在SendRequest()方法中要查找的Service:
C#語言: IService service = FindService(aServiceName);?????? 下面我們來看看FindService()方法的實現:
C#語言:? protected IService FindService(string aServiceName){
??? lock (this.m_Services)
??? {
???????? IService service = (IService)m_Services[aServiceName];
???????? if (service != null)
???????? {
???????????? return service;
???????? }
???????? else
???????? {
???????????? IService tmpService = GetService(aServiceName);
???????????? AddService(aServiceName,tmpService);
???????????? return tmpService;
???????? }
??? }
}
??????? 可以看到,這個服務是被添加到m_Service對象中,該對象為SortedList類型,服務名為Key,IService對象為Value。如果沒有找到,則通過私有方法GetService()來獲得:
C#語言: private IService GetService(string aServiceName){
??? IService service = (IService)Activator.GetObject(typeof(RequestListener),
???????? "tcp://localhost:9090/" + aServiceName + ".RequestListener");
??? return service;
}
??????? 在這里,Channel、IP、Port應該從配置文件中獲取,為簡便起見,這里直接賦為常量。
?????? 再分析SendRequest方法,在找到對應的服務后,執行了IService的Execute()方法。此時的IService為 RequestListener,而從前面對RequestListener的定義可知,Execute()方法執行的其實是其偵聽的業務服務的 OnRequest()方法。
?????? 我們可以定義一個具體的業務服務類,來分析整個消息傳遞的過程。該類繼承于Service抽象類:
C#語言:? public class MyService:Service{
???? public MyService(string aServiceName):base(aServiceName)
???? {}?????????
}
??????? 假設把進程分為服務端和客戶端,那么對消息處理的步驟如下:
?1、 在客戶端調用MyService的SendRequest()方法發送Request消息;
?2、 查找被Marshal的服務,即RequestListener對象,此時該對象應包含對應的業務服務對象MyService;
?3、 在服務端調用RequestListener的Execute()方法。該方法則調用業務服務MyService的OnRequest()方法。
在這些步驟中,除了第一步在客戶端執行外,其他的步驟均是在服務端進行。
?三、業務服務對于消息的處理
?????? 前面實現的服務架構,已經較為完整地實現了分布式的服務處理。但目前的實現,并未體現對消息的處理。我認為,對消息的處理,等價與具體的業務處理。這些業 務邏輯必然是在服務端完成。每個服務可能會處理單個業務,也可能會處理多個業務。并且,服務與服務之間仍然存在通信,某個服務在處理業務時,可能需要另一 個服務的業務行為。也就是說,每一種類的消息,處理的方式均有所不同,而這些消息的唯一標識,則是在SendRequest()方法已經有所體現的 aMessageName。
?????? 雖然,處理的消息不同,所需要的服務不同,但是根據我們對消息的定義,我們仍然可以將這些消息處理機制抽象為一個統一的格式;在.Net中,體現這種機制的莫過于委托delegate。我們可以定義這樣的一個委托:
C#語言:? public delegate void RequestHandler(string aMessageName,IMessageItemSequence aMessageBody,ref IMessageItemSequence aReplyMessageBody);?????? 在RequestHandler委托中,它代表了這樣一族方法:接收三個入 參,aMessageName,aMessageBody,aReplyMessageBody,返回值為void。其中,aMessageName代表 了消息名,它是消息的唯一標識;aMessageBody是待處理消息的主體,業務所需要的所有數據都存儲在aMessageBody對象中。 aReplyMessageBody是一個引用對象,它存儲了消息處理后的返回結果,通常情況下,我們可以 用<"result","Success">或<"result", "Failure">來代表處理的結果是成功還是失敗。
?????? 這些委托均在服務初始化時被添加到服務類的SortedList對象中,鍵值為aMessageName。所以我們可以在抽象類中定義如下方法:?????
C#語言:? protected abstract void AddRequestHandlers();protected void AddRequestHandler(string aMessageName,RequestHandler handler)
{
??? lock (this.m_EventHandlers)
??? {
???????? if (!this.m_EventHandlers.Contains(aMessageName))
???????? {
???????????? this.m_EventHandlers.Add(aMessageName,handler);
???????? }
??? }
}
protected RequestHandler FindRequestHandler(string aMessageName)
{
??? lock (this.m_EventHandlers)
??? {
???????? RequestHandler handler = (RequestHandler)m_EventHandlers[aMessageName];
???????? return handler;
??? }
}
?????? AddRequestHandler()用于添加委托對象與aMessageName的鍵值對,而FindRequestHandler()方法用于查找 該委托對象。而抽象方法AddRequestHandlers()則留給Service的子類實現,簡單的實現如MyService的 AddRequestHandlers()方法:
C#語言:? public class MyService:Service{
???? public MyService(string aServiceName):base(aServiceName)
???? {}
???? protected override void AddRequestHandlers()
???? {
???????? this.AddRequestHandler("Test1",new RequestHandler(Test1));
???????? this.AddRequestHandler("Test2",new RequestHandler(Test2));
???? }
???? private void Test1(string aMessageName,IMessageItemSequence aMessageBody,ref? IMessageItemSequence aReplyMessageBody)
???? {
???????? Console.WriteLine("MessageName:{0}\n",aMessageName);
???????? Console.WriteLine("MessageBody:{0}\n",aMessageBody);
???????? aReplyMessageBody.SetValue("result","Success");
???? }
???? private void Test2(string aMessageName,IMessageItemSequence aMessageBody,ref?? IMessageItemSequence aReplyMessageBody)
???? {
???????? Console.WriteLine("Test2" + aMessageBody.ToString());
???? }
}
?????? Test1和Test2方法均為匹配RequestHandler委托簽名的方法,然后在AddRequestHandlers()方法中,通過調用 AddRequestHandler()方法將這些方法與MessageName對應起來,添加到m_EventHandlers中。
?????? 需要注意的是,本文為了簡要的說明這種處理方式,所以簡化了Test1和Test2方法的實現。而在實際開發中,它們才是實現具體業務的重要方法。而利用這種方式,則解除了服務之間依賴的耦合度,我們隨時可以為服務添加新的業務邏輯,也可以方便的增加服務。
?????? 通過這樣的設計,Service的OnRequest()方法的最終實現如下所示:
C#語言:? public IMessage OnRequest(IMessage aMessage){
??? string messageName = aMessage.GetMessageName();
??? string messageID = aMessage.GetMessageID();
??? IMessage message = m_Factory.CreateMessage();
??? IMessageItemSequence replyMessage = m_Factory.CreateMessageItemSequence();
??? RequestHandler handler = FindRequestHandler(messageName);
??? handler(messageName,aMessage.GetMessageBody(),ref replyMessage);
??? message.SetMessageName(messageName);
??? message.SetMessageID(messageID);
??? message.SetMessageBody(replyMessage);
??? return message;
}
?????? 利用這種方式,我們可以非常方便的實現服務間通信,以及客戶端與服務端間的通信。例如,我們分別在服務端定義MyService(如前所示)和TestService:
C#語言: public class TestService:Service{
???? public TestService(string aServiceName):base(aServiceName)
???? {}
???? protected override void AddRequestHandlers()
???? {
???????? this.AddRequestHandler("Test1",new RequestHandler(Test1));????????
???? }
???? private void Test1(string aMessageName,IMessageItemSequence aMessageBody,ref? IMessageItemSequence aReplyMessageBody)
???? {??????????
???????? aReplyMessageBody = SendRequest("MyService",aMessageName,aMessageBody);
???????? aReplyMessageBody.SetValue("result2","Success");
???? }
}
?????? 注意在TestService中的Test1方法,它并未直接處理消息aMessageBody,而是通過調用SendRequest()方法,將其傳遞到MyService中。
?????? 對于客戶端而言,情況比較特殊。根據前面的分析,我們知道除了發送消息的操作是在客戶端完成外,其他的具體執行都在服務端實現。所以諸如 MyService和TestService等服務類,只需要部署在服務端即可。而客戶端則只需要定義一個實現Service的空類即可:
C#語言: public class MyServiceClient:Service{
??? public MyServiceClient(string aServiceName):base(aServiceName)
???? {}
???? protected override void AddRequestHandlers()
???? {}
}
?????? MyServiceClient類即為客戶端定義的服務類,在AddRequestHandlers()方法中并不需要實現任何代碼。如果我們在 Service抽象類中,將AddRequestHandlers()方法定義為virtual而非abstract方法,則這段代碼在客戶端服務中也可 以省去。另外,客戶端服務類中的aServiceName可以任意賦值,它與服務端的服務名并無實際聯系。至于客戶端具體會調用哪個服務,則由 SendRequest()方法中的aServiceName決定:
C#語言: IMessageFactory factory = new MessageFactory();IMessageItemSequence body = factory.CreateMessageItemSequence();
//……
MyServiceClient service = new MyServiceClient("Client");
IMessageItemSequence reply = service.SendRequest("TestService","Test1",body);
??????? 對于service.SendRequest()的執行而言,會先調用TestService的Test1方法;然后再通過該方法向MyService發送,最終調用MyService的Test1方法。
?????? 我們還需要另外定義一個類,負責添加服務,并初始化這些服務:
C#語言:? public class Server{
???? public Server()
???? {
???????? m_Services = new ArrayList();
???? }
???? private ArrayList m_Services;???
???? public void AddService(IListenService service)
???? {
???????? this.m_Services.Add(service);
???? }
???? public void Initialize()
???? {?
???????? IDictionary tcpProp = new Hashtable();
???????? tcpProp["name"] = "tcp9090";
???????? tcpProp["port"] = 9090;
???????? TcpChannel channel = new TcpChannel(tcpProp,new BinaryClientFormatterSinkProvider(),
???????????????????????????????????????????????????? new BinaryServerFormatterSinkProvider());??????????
???????? ChannelServices.RegisterChannel(channel);
???????? foreach (Service service in m_Services)
???????? {
????????????? service.Initialize();
???????? }??????????
???? }
}
?????? 同理,這里的Channel,IP和Port均應通過配置文件讀取。最終的類圖如下所示:
?
?????? 在服務端,可以調用Server類來初始化這些服務:
C#語言:? static void Main(string[] args){?
??? MyService service = new MyService("MyService");
??? TestService service1 = new TestService("TestService");
??? Server server = new Server();
??? server.AddService(service);
??? server.AddService(service1);
??? server.Initialize();
??? Console.ReadLine();
}
?四、結論
?????? 利用這個基于消息與.Net Remoting技術的分布式架構,可以將企業的業務邏輯轉換為對消息的定義和處理。要增加和修改業務,就體現在對消息的修改上。服務間的通信機制則完全 交給整個架構來處理。如果我們將每一個委托所實現的業務(或者消息)理解為Contract,則該結構已經具備了SOA的雛形。當然,該架構僅僅處理了消 息的傳遞,而忽略了對底層事件的處理(類似于Corba的Event Service),這個功能我想留待后面實現。
?????? 唯一遺憾的是,我缺乏驗證這個架構穩定性和效率的環境。應該說,這個架構是我們在企業項目解決方案中的一個實踐。但是解決方案則是利用了CORBA中間 件,在Unix環境下實現并運行。本架構僅僅是借鑒了核心的實現思想和設計理念,從而完成的在.Net平臺下的移植。由于Unix與Windows Server的區別,其實際的優勢還有待驗證。
?
轉載于:https://blog.51cto.com/wayfarer/279909
總結
以上是生活随笔為你收集整理的基于消息与.Net Remoting的分布式处理架构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SCRUM 12.03
- 下一篇: 用HiTool烧写uboot到spi f