HornetQ之JMS2.0 (实例讲解)
前言:
在2013年4月終于迎來了新的JMS規范-JMS2.0,這是第一次對JMS規范進行更新從2002年發布的JMS1.1版本.我們也許會認為JMS這么久以來從來沒更新是否是因為已經停止發展或者被廢棄不用.?但是,如果你從另外一個叫角度來分析,?JMS這個規范存在很多不同的實現版本來看,?就充分說明JMS是一個非常成功的API?規范。
在JMS2.0規范中,主要包括兩方面的重大改進。其一是:更方便的使用API,?再則是:引入了許多新的消息特性。JMS2.0是JAVAEE7平臺的一部分,它不但可以被用于到JAVAEE?Web或者EJB應用程序重也可以被用于J2SE環境中.
接下來我們就來看看新特性和更方便的API.
?
簡單API:
為了下面的講解做準備,首先創建一個queue和一個topic在hornetq-jms.xml中
<queue name="test1"> <entry name="queue/test1"/> </queue> <topic name="test2"> <entry name="topic/test2"/> </topic >
合并Connection和session
在老的API中要生產/消費一個消息必須經過這么幾步,獲取connectionFactory->獲取queue/topic->創建Connection->創建session->生產/消費消息.?在JMS2.0中提供了更簡單的API,將創建connection和創建session合并成了一個對象JMSContext,對應的MessageProducer/MessageConsume分別用JMSProcedure/JMSConsumer替代.對參數封裝上也相對于JMS1.1更易于理解,在老版本中JMS的事務和ACK模式都是在創建session時顯示聲明的,?并且兩個參數很容易誤導開發者:Session?session?=?conn.createSession(false,?Session.AUTO_ACKNOWLEDGE);第一個參數代表是啟用事務,第二個參數代表ACK模式。如果啟用事務第二參數將被忽略但是第二個參數又是必須的,這點就容易誤導后來的程序維護工程師.在JMS2.0API中對這個特性進行了整改通過builder模式。
Context cnx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) cnx.lookup("/ConnectionFactory"); Queue queue = (Queue) cnx.lookup("/queue/test1"); try(JMSContext jmsContext = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);) { jmsContext.createProducer() .setDeliveryDelay(1000) .setDeliveryMode(DeliveryMode.NON_PERSISTENT) .send(queue, "content1"); System.out.println("Continue.............."); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }直接在創建JMSContext的時聲明事務模式和ACK模式,兩者選其一。?
public static final int AUTO_ACKNOWLEDGE = 1; public static final int CLIENT_ACKNOWLEDGE = 2; public static final int DUPS_OK_ACKNOWLEDGE = 3; public static final int SESSION_TRANSACTED = 0;
利用JDK7的closeable接口實現自動關閉,觀察上面例子我們并沒有類似于JMS1.1API那樣去關閉Connection關閉session,什么時候關閉的呢?JMSContext利用了JDK1.7的新特性它繼承至AutoCloseable接口,當塊代碼結束的時候自動調用對象的close()方法進行連接關閉.
更方便進行異步消費消息,在JMS1.1中需要手動調用connection.start()方法去開啟消費進程。在新的API中默認自動就啟動了,?不需要顯示的啟動。
Context cnx; JMSContext jmsContext =null; try { cnx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); Queue queue = (Queue) cnx.lookup("/queue/test1"); jmsContext = cf.createContext(); // jmsContext.setAutoStart(true); JMSConsumer jmsConsumer = jmsContext.createConsumer(queue); jmsConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println(Message.class.getName()); } }); CountDownLatch latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }finally{ if(jmsContext!=null){ jmsContext.close(); } }?
消息payload自動解析通過泛型,在老版本中需要進行手動的類型轉換來獲取到真實的消息內容.第一個步驟需要將Message轉化為對應的TextMessage/ByteMessage/MapMessage/StreamMessage/ObjectMessage.?在新的API中除了StreamMessage不能自動檢索外,?其他類型的message都可以通過方便的API直接獲取到消息內容不必經過多次的強制類型轉化。
Context cnx = null; ConnectionFactory cf = null; Queue queue = null; try { cnx = new InitialContext(); cf = (ConnectionFactory) cnx.lookup("/ConnectionFactory"); queue = (Queue) cnx.lookup("/queue/test1"); } catch (Exception e) { } try (JMSContext jmsContext = cf .createContext(JMSContext.AUTO_ACKNOWLEDGE);) { // Delay 3 seconds to devlivery .setDeliveryDelay(3000) jmsContext.createProducer() .setDeliveryDelay(DeliveryMode.PERSISTENT) .send(queue, "content@").send(queue, "content@1") .send(queue, "content@2"); System.out.println("Start receive.............."); JMSConsumer jmsConsumer = jmsContext.createConsumer(queue); String msgBody = jmsConsumer.receiveBody(String.class); System.out.println(msgBody); Message msg1 = jmsConsumer.receive(); System.out.println(msg1.getBody(String.class)); Message msg2 = jmsConsumer.receive(); System.out.println(msg2.getBody(String.class)); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); }
新特性:
允許多個訂閱者在同一個Topic?Subscription,想象現在消息的吞吐量很大,?在老的版本中我們一個消息只能綁定到一個訂閱者。但是消息兩很大希望能做到負載均衡類似于Apache。JMS2.0提供這個功能,支持在多個虛擬機中共享一個消息。新建四個訂閱者在四個(兩組)不同的VM中.
第一組:
Client1-S1:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext(); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S1"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client1-S1:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
Client2-S1:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext("guest", "guest"); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S1"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client2-S1:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
第二組:
Client3-S2:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext("guest", "guest"); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S2"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client3-S2:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
Client4-S2:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext(); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S2"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client4-S2:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
分兩次發送消息到同一個topic觀察輸出結果在四個VM控制臺.
第一次發送消息:
Client4-S2:12:HornetQMessage[ID:5f856d00-bff1-11e3-86fb-af8e1cdeda67]:PERSISTENT
Client1-S1:12:HornetQMessage[ID:4b696a47-bff1-11e3-8b74-d32c9a2e8eec]:PERSISTENT
第二次發送消息:
Client3-S2:12:HornetQMessage[ID:4b696a47-bff1-11e3-8b74-d32c9a2e8eec]:PERSISTENT
Client2-S1:12:HornetQMessage[ID:5f856d00-bff1-11e3-86fb-af8e1cdeda67]:PERSISTENT
延遲遞送消息,假象有這么一種業務模型,當前有兩個數據庫環境生產庫PROD和一個數據倉庫DWS,PROD數據會實時的復制到數據倉庫。?現在外部程序插入一條數據進入PROD?并且同時生成一個消息到消息服務器,消息服務器的消費者會立刻消費這個消息并且和DWS數據進行整合,由于數據庫的復制可能會因為CPU或者LOCK等原因有一定的也延時,此時就可能數據還沒有到DWS數據庫此時消費者就不能完成數據整合操作。一種方案就是重復消費,?這樣就有一點開銷對于消費者來說。?我們希望一次就能夠保證消費成功。那么我們就可以受用延遲遞送消息到消費者。假設數據復制到DWS最大延時是1h,我們可以設置消息的延遲遞送時間為1h就能夠保證消費者只需要一次消費消息就可以達到數據整合目的。?設置延遲遞送類似于JMS1.1也非常簡單.
Context cnx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) cnx.lookup("/ConnectionFactory"); Queue queue = (Queue) cnx.lookup("/queue/test1"); try(JMSContext jmsContext = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);) { jmsContext.createProducer() .setDeliveryDelay(10000) .setDeliveryMode(DeliveryMode.NON_PERSISTENT) .send(queue, "content1"); System.out.println("Continue.............."); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
經過筆者測試:Hornetq延遲遞送特性,必須是消費者和生產者在不同的虛擬機。實際情況也會這么用。
異步發送消息.類似于異步消費,同樣提供更為簡便的API進行異步發送。?HornetQ似乎對這點支持不太好,有異步發送的效果。
private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue) throws Exception { // send a message asynchronously try (JMSContext context = connectionFactory.createContext();){ CountDownLatch latch = new CountDownLatch(1); MyCompletionListener myCompletionListener = new MyCompletionListener(latch); context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world"); System.out.println("Message sent, now waiting for reply"); // at this point we can do something else before waiting for the reply // this is not shown here latch.await(); if (myCompletionListener.getException()==null){ System.out.println("Reply received from server"); } else { throw myCompletionListener.getException(); } } }
JMSXDeliveryCount,這個是Message的一個屬性,實際上在Jms1.1已經存在了,標示這個消息被重復遞送的次數.但是這個在JMS1.1是可選的并不是所有的供應商都支持這個屬性,但是在JMS2.0這個屬性被強制的設置了。?我們可以通過這個屬性來判斷消息被重復遞送了多少次來決定是否丟棄這個消息或者做進一步處理,或者交給DeadLetter.
try { System.out.println(message.getIntProperty("JMSXDeliveryCount")); } catch (JMSException e) { }
參考博客:
http://www.oracle.com/technetwork/articles/java/jms2messaging-1954190.html
http://www.oracle.com/technetwork/articles/java/jms20-1947669.html
轉載于:https://blog.51cto.com/ganludong/1393076
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的HornetQ之JMS2.0 (实例讲解)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle导入数据
- 下一篇: 现代控制理论第八版第二章读书笔记