JBoss 系列十七:使用JGroups构建块MessageDispatcher 构建群组通信应用
內容概要
本部分說明JGroups構建塊接口MessageDispatcher,具體提供一個簡單示例來說明如何使用JGroups構建塊MessageDispatcher 構建群組通信應用
示例描述
構建塊基于通道之上,是對通道API的更高層抽象,MessageDispatcher提供異步和同步的方法發送消息給集群中的成員并等待響應,我們知道直接使用jGroups API可以向群組發送消息,或從群組接收消息,但發送和接收之間沒有任何聯系,而使用MessageDispatcher發送消息和接收消息是在同一個事務中完成。本示例演示使用MessageDispatcher 發送消息到集群所有成員并等待響應,驗證 GET_ALL 響應模式,超時時間等屬性。
示例步驟
本示例集群中有三個成員node1,node2 和node3,node1 為協調者(第一個加入集群)負責集群視圖的更新。三個節點都是做相同的事情,向集群中所有節點發送一條消息,并等待接收響應消息。使用JBoss Cluster Framework Demo 介紹所示的方法,任意從SourceForge下載或編譯生成DEMO_HOME,本示例的啟動腳本msgDispatcher.sh位于DEMO_HOME/bin下。接下來我們依次啟動三個節點:
?
./msgDispatcher.sh -n node1?
./msgDispatcher.sh -n node2?
./msgDispatcher.sh -n node3
注意,-n指定一個節點名字,Windows操作系統使用對應.bat腳本。
?
結果分析
1. node1,node2 和node3中任何一個節點啟動后我們都可以看到發送消息到群組的日志信息,如下為node1上輸出的日志:
?
16:36:43,699 INFO [MessageDispatcherTest] Casting message to all group members
2.?node1,node2 和node3中任何一個節點啟動發送完消息后都可以看到群組中返回消息的輸出信息,如下為node3上打印輸出的響應消息:
?
?
Responses:node3-MessageDispatcher Test Messagenode2-MessageDispatcher Test Messagenode1-MessageDispatcher Test Message?
?
我們可以看到node3節點打印輸出的響應消息有三條,及集群中的所有節點都收到node3發送的消息,且集群中所有節點發送的響應消息都被node3收到。我們可以通過響應模式來控制是否消息發送者要等待所有節點的響應消息,本示例中使用的是 GET_ALL 響應模式,所以只有當接收到所有節點的響應消息或等待超時拋出異常才終止。
3.?node1,node2 和node3中任何一個節點啟動后的日志中我們可以看到MyRequestHandler處理消息輸出的日志,如下為node1上輸出的日志:
?
16:37:11,839 INFO [MyRequestHandler] node3, MessageDispatcher Test Message
4.?node1,node2 和node3中任何一個節點運行的日志中我們可以看到MyMembershipListener輸出的日志,當群組成員關系發生變化是群視圖被打印輸出,如下為node1上輸出的日志信息:
?
?
16:37:11,655 INFO [MyMembershipListener] ViewAccepted, [node1|2] [node1, node2, node3]?
?
[node1|2] [node1, node2, node3]為打印輸出的群組視圖,node1|2 表示群組的協調者為node1,豎線后面的2表示視圖被更新了3次(node1|0,node1|1,node1|2);視圖中共有三個成員,即node1,node2 和node3。
代碼分析
本示例所有的源代碼可以在cluster/jgroups/stu/src/main/java/.../blocks下找到,接下來我們從代碼的層面去解釋上面的分析結果,這樣有助于更直觀的理解構建塊MessageDispatcher的理解:
?
47 channel = new JChannel(props);48 if(null != name) {49 channel.setName(name);50 }51 handler = new MyRequestHandler(channel);52 messageListener = new MyMessageListener();53 membershipListener = new MyMembershipListener(); 55 disp = new MessageDispatcher(channel, messageListener, membershipListener, handler);56 channel.connect("MessageDispatcherTestGroup"); 58 Util.sleep(100);59 logger.info("Casting message to all group members");60 Message message = new Message(null, null, new String("MessageDispatcher Test Message"));61 rsp_list = disp.castMessage(null, message, new RequestOptions().setMode(ResponseMode.GET_ALL).setTimeout(0)); 63 System.out.println("Responses:"); 65 List list = rsp_list.getResults();66 for(Object obj : list) {67 System.out.println(" " + obj);68 }
如上47-50行創建一個,設定相應節點的名字;55行實例化一個 MessageDispatcher,我們可以看到,構建塊接口是基于通道之上,是對通道的更高層抽象,所以創建 MessageDispatcher時需要傳遞一個實例化的通道(channel),MessageDispatcher 實例化后通道開始連接到群組(如56行所示);59-61行創建一個消息,通過 castMessage()方法將消息發送到群組,注意,目的地址為空,所以消息接收者為群組所有成員,響應模式為 GET_ALL,所以發送者等待所有成員響應后 castMessage()方法返回,超時時間為 0,即castMessage一直出于等待狀態知道所有節點響應消息返回或超時拋出異常;65-68行打印輸出所有響應返回消息。
?
?
9 public class MyRequestHandler implements RequestHandler {...20 public Object handle(Message msg) throws Exception {21 Address sender = msg.getSrc();22 logger.info(sender + ", " + msg.getObject());23 return channel.getName() + "-" + msg.getObject();24 }25 26 }?
?
如上第9行,MyRequestHandler實現RequestHandler接口,RequestHandler定義了handle方法,用來處理接收到的消息,20-24行為我們實現的handle方法,我們在日志中記錄消息發送者的名字和消息的內容(結果分析第3點node1輸出的日志“node3, MessageDispatcher Test Message”表示消息發送著是node3,消息的內容是“MessageDispatcher Test Message”),并將自己的名字和接收到消息的內容返回(結果分析第2點node3輸出的日志包括“node1-MessageDispatcher Test Message”表示該響應消息是從node1返回的)。
?
8 public class MyMembershipListener implements MembershipListener {...12 public void viewAccepted(View view) {13 logger.info("ViewAccepted, " + view);14 }
如上第8行表示MyMembershipListener實現了MembershipListener接口,MembershipListener接口定義一些成員控制的方法,這里我們實現了viewAccepted()方法,即當群組成員發生變化時該方法被調運,記錄群組視圖到日志中。
?
總結
以上是生活随笔為你收集整理的JBoss 系列十七:使用JGroups构建块MessageDispatcher 构建群组通信应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql复制的配置
- 下一篇: android 设置Button或者Im