ActiveMq消费端实现集群部署
1.問題背景
一個事件中心接收網(wǎng)關通過ActiveMq上報的告警事件,處理后持久化到數(shù)據(jù)庫,消息模型為發(fā)布訂閱模式。為了實現(xiàn)高可用,決定將事件中心進行集群部署,運行兩個實例。
但是由于消息模型為發(fā)布/訂閱(publish/subscribe,topic),每個eps實例都會收到告警消息。如不加以控制,勢必會造成告警消息重復消費的問題。
即我們需要不同的應用系統(tǒng)關心相同的消息,同時單個應用系統(tǒng)內部又可以部署多個實例達到負載均衡和故障轉移,提高系統(tǒng)的健壯性。
2.解決思路
我們首先想到可以通過消費端增加檢測重復消息的邏輯,來解決重復消費的問題,但是這種方式增加額外判斷邏輯,且浪費消費端性能,并不可取。
實際上我們可以通過ActiveMq的一些高級特性來很好的解決此問題。ActiveMq提供了虛擬主題(Virtual Topics)的功能,如下圖:
即通過一些配置后,單個應用系統(tǒng)內多個實例監(jiān)聽同一個queue,實例之間即可對消息進行平均消費,共同承擔消費的功能。如果其中一個eps實例宕機,其他實例仍可以正常消費消息,消息不會丟失遺漏。
Virtual Topic這個功能特性可以關閉,即useVirtualTopics屬性,默認為true,設置為false即可關閉此功能。當此功能開啟,并且使用了持久化的存儲時,ActiveMq的broker啟動的時候會從持久化存儲里拿到所有的Topic的名稱,如果名稱模式與VirtualTopics匹配,則把它們添加到系統(tǒng)的Virtual Topics列表中去。當有consumer訪問此VirtualTopics時,系統(tǒng)會自動創(chuàng)建持久化的queue,并在每次Topic收到消息時,分發(fā)到具體的queue。
3.使用方法
虛擬主題有兩種使用方式,下面分別進行介紹。
3.1 topic-queue名稱匹配方式
此種方式需要發(fā)布者發(fā)布的Topic以“VirtualTopic.”這樣的前綴來命名(大小寫敏感)。比如我們定義一個VirtualTopic.event,然后發(fā)布者將消息發(fā)布到VirtualTopic.event。訂閱者需要訂閱名稱為”Consumer.*. VirtualTopic.event”這樣的隊列。
下面使用springboot搭建工程進行演示,由于配置簡單只涉及到topic和queue的名稱,下面只顯示關鍵步驟。
首先發(fā)布者創(chuàng)建名稱VirtualTopic.event的topic,然后每隔3秒向此topic發(fā)布一條消息,
然后建立兩個應用作為消費者分別監(jiān)聽名為“Consumer.A.VirtualTopic.event“的queue,
第一個消費者:
第二個消費者:
@Service public class JmsMessageListener {@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")public void receiveMessage(String message){System.out.println(" 2 接收到消息: "+JsonUtil.json2Object(message,Event.class));} }最后啟動兩個消費者和發(fā)布者,通過兩個消費者的輸出窗口可以看到他們共同承擔了消息的消費:
1 接收到消息: Event{name='觸碰', id=2, date=Tue Aug 28 21:18:50 CST 2018} 1 接收到消息: Event{name='觸碰', id=4, date=Tue Aug 28 21:18:56 CST 2018} 1 接收到消息: Event{name='觸碰', id=6, date=Tue Aug 28 21:19:02 CST 2018} 1 接收到消息: Event{name='觸碰', id=8, date=Tue Aug 28 21:19:08 CST 2018} 1 接收到消息: Event{name='觸碰', id=10, date=Tue Aug 28 21:19:14 CST 2018} 1 接收到消息: Event{name='觸碰', id=12, date=Tue Aug 28 21:19:20 CST 2018} 1 接收到消息: Event{name='觸碰', id=14, date=Tue Aug 28 21:19:26 CST 2018} 1 接收到消息: Event{name='觸碰', id=16, date=Tue Aug 28 21:19:32 CST 2018} 1 接收到消息: Event{name='觸碰', id=18, date=Tue Aug 28 21:19:38 CST 2018} 2 接收到消息: Event{name='觸碰', id=1, date=Tue Aug 28 21:18:47 CST 2018}2 接收到消息: Event{name='觸碰', id=3, date=Tue Aug 28 21:18:53 CST 2018}2 接收到消息: Event{name='觸碰', id=5, date=Tue Aug 28 21:18:59 CST 2018}2 接收到消息: Event{name='觸碰', id=7, date=Tue Aug 28 21:19:05 CST 2018}2 接收到消息: Event{name='觸碰', id=9, date=Tue Aug 28 21:19:11 CST 2018}2 接收到消息: Event{name='觸碰', id=11, date=Tue Aug 28 21:19:17 CST 2018}2 接收到消息: Event{name='觸碰', id=13, date=Tue Aug 28 21:19:23 CST 2018}2 接收到消息: Event{name='觸碰', id=15, date=Tue Aug 28 21:19:29 CST 2018}2 接收到消息: Event{name='觸碰', id=17, date=Tue Aug 28 21:19:35 CST 2018}2 接收到消息: Event{name='觸碰', id=19, date=Tue Aug 28 21:19:41 CST 2018}2 接收到消息: Event{name='觸碰', id=21, date=Tue Aug 28 21:19:47 CST 2018}2 接收到消息: Event{name='觸碰', id=23, date=Tue Aug 28 21:19:53 CST 2018}然后我們將消費者2關閉(宕機),可以看到消費者1消費了所有的消息
1 接收到消息: Event{name='觸碰', id=134, date=Tue Aug 28 21:25:26 CST 2018} 1 接收到消息: Event{name='觸碰', id=135, date=Tue Aug 28 21:25:29 CST 2018} 1 接收到消息: Event{name='觸碰', id=136, date=Tue Aug 28 21:25:32 CST 2018} 1 接收到消息: Event{name='觸碰', id=137, date=Tue Aug 28 21:25:35 CST 2018} 1 接收到消息: Event{name='觸碰', id=138, date=Tue Aug 28 21:25:38 CST 2018}通過以上試驗,我們看到通過虛擬主題的方式,兩個實例可以通過監(jiān)聽同一個queue來平均消費消息,如果其中一個宕機,另一個會承擔起所有的消息消費。
上面的方式需要發(fā)布者和訂閱者統(tǒng)一對命名規(guī)范,如果發(fā)布者和訂閱者已經(jīng)存在,就需要統(tǒng)一升級,比較麻煩。實際上我們還可以攔截器的方法。
3.2 攔截器方式
這種方式需要修改ActiveMq的配置文件/conf/activemq.xml,添加以下攔截配置:
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="event" prefix="demo.*." selectorAware="false"/> </virtualDestinations></virtualDestinationInterceptor> </destinationInterceptors>然后發(fā)布者只需要往event主題上發(fā)布消息。訂閱者通過訂閱類似demo.A. event的隊列的方式來消費。其他沒有集群部署的應用仍可以訂閱event主題進行消費。
此種方式的優(yōu)點在于我們不需要對發(fā)布者和不需要改造的訂閱者做任何變動,需要增加或者改造的訂閱者使用虛擬主題的方式進行訂閱即可達到負載均衡和故障轉移的目的,當無法約束發(fā)布者發(fā)布的topic規(guī)范時,這種方式很有用。當然缺點就是需要修改ActiveMQ的配置,也就是說需要重啟ActiveMQ,這對于已經(jīng)上線的平臺來說可能造成消息丟失。
參考:虛擬主題開發(fā)
總結
以上是生活随笔為你收集整理的ActiveMq消费端实现集群部署的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring的aware学习
- 下一篇: postgres中分组后拼接多行