ActiveMQ的几种集群配置
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ的几种集群配置
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
ActiveMQ是一款功能強大的消息服務器,它支持許多種開發語言,例如Java, C, C++, C#等等。企業級消息服務器無論對服務器穩定性還是速度,要求都很高,而ActiveMQ的分布式集群則能很好的滿足這一需求,下面說說ActiveMQ的幾種集群配置。Queue consumer clusters此集群讓多個消費者同時消費一個隊列,若某個消費者出問題無法消費信息,則未消費掉的消息將被發給其他正常的消費者,結構圖如下:
一、activeMQ主要的幾類部署方式比較1、默認的單機部署(kahadb)activeMQ的默認存儲的單機方式,以本地kahadb文件的方式存儲,所以性能指標完全依賴本地磁盤IO,不能提供高可用。2、基于zookeeper的主從(levelDB Master/Slave)5.9.0新推出的主從實現,基于zookeeper來選舉出一個master,其他節點自動作為slave實時同步消息。因為有實時同步數據的slave的存在,master不用擔心數據丟失,所以leveldb會優先采用內存存儲消息,異步同步到磁盤。所以該方式的activeMQ讀寫性能都最好,特別是寫性能能夠媲美非持久化消息。優點:實現高可用和數據安全性能較好缺點:因為選舉機制要超過半數,所以最少需要3臺節點,才能實現高可用。3、基于共享數據庫的主從(Shared JDBC Master/Slave)可以基于postgres、mysql、oracle等常用數據庫。每個節點啟動都會爭搶數據庫鎖,從而保證master的唯一性,其他節點作為備份,一直等待數據庫鎖的釋放。因為所有消息讀寫,其實都是數據庫操作,activeMQ節點本身壓力很小,性能完全取決于數據庫性能。優點:實現高可用和數據安全簡單靈活,2臺節點就可以實現高可用缺點:穩定性依賴數據庫性能依賴數據庫
ActiveMQ 集群配置 高可用自從activemq5.9.0開始,activemq的集群實現方式取消了傳統的Pure Master Slave方式,增加了基于zookeeper+leveldb的實現方式,其他兩種方式:目錄共享和數據庫共享依然存在。1、Master-Slave部署方式 1)、Shared Filesystem Master-Slave方式 2)、Shared Database Master-Slave方式3)、Replicated LevelDB Store方式第一種方案同樣支持N個AMQ實例組網,但由于他是基于kahadb存儲策略,亦可以部署在分布式文件系統上,應用靈活、高效且安全。 第二種方案與shared filesystem方式類似,只是共享的存儲介質由文件系統改成了數據庫而已,支持N個AMQ實例組網,但他的性能會受限于數據庫; 第三種方案是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node開啟并接受客戶端連接。 其他node轉入slave模式,連接master并同步他們的存儲狀態。其他node轉入slave模式,連接master并同步他們的存儲狀態。如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網絡中并連接master進入slave mode。至于為什么是2-1,熟悉Zookeeper的應該知道,有一個node要作為觀擦者存在。
Shared Filesystem Master-Slave方式 shared filesystem Master-Slave部署方式主要是通過共享存儲目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪個應用搶到了控制權,它就成為master。 多個共享存儲目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就只能作為slave。 一、下載activeMQ解壓并復制3份二、修改配置文件conf下面 activemq.xml如下 ,3份文件broker都修改為一致 其中dataDirectory 是 數據存儲共享目錄地址修改 kahaDB目錄地址為數據存儲共享目錄地址修改openwire 的tcp 連接地址端口分別為 61616 61626 61636 二 、修改conf 下面的 jetty.xml文件,修改管理界面端口 8161 8162 8163 方便測試查看啟動的那一個activemq服務三、啟動 成功了一個后面的將阻塞等待獲取鎖
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {public static void main(String[] args) {// ConnectionFactory :連接工廠,JMS 用它創建連接ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連接Connection connection = null;// Session: 一個發送或接收消息的線程Session session;// Destination :消息的目的地;消息發送給誰.Destination destination;// 消費者,消息接收者MessageConsumer consumer;connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61626,tcp://localhost:61636)");try {// 構造從工廠得到連接對象connection = connectionFactory.createConnection();// 啟動connection.start();// 獲取操作連接session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置destination = session.createQueue("FirstQueue");consumer = session.createConsumer(destination);while (true) {//設置接收者接收消息的時間,為了便于測試,這里誰定為100sTextMessage message = (TextMessage) consumer.receive(100000);if (null != message) {System.out.println("收到消息" + message.getText());} else {break;}}} catch (Exception e) {e.printStackTrace();} finally {try {if (null != connection)connection.close();} catch (Throwable ignore) {}}}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {private static final int SEND_NUMBER = 5;public static void main(String[] args) {// ConnectionFactory :連接工廠,JMS 用它創建連接ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連接Connection connection = null;// Session: 一個發送或接收消息的線程Session session;// Destination :消息的目的地;消息發送給誰.Destination destination;// MessageProducer:消息發送者MessageProducer producer;// TextMessage message;// 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jarconnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61626,tcp://localhost:61636)");try {// 構造從工廠得到連接對象connection = connectionFactory.createConnection();// 啟動connection.start();// 獲取操作連接session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置destination = session.createQueue("FirstQueue");// 得到消息生成者【發送者】producer = session.createProducer(destination);// 設置不持久化,此處學習,實際根據項目決定 ---- 集群此處必須持久化// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 構造消息,此處寫死,項目就是參數,或者方法獲取sendMessage(session, producer);session.commit();} catch (Exception e) {e.printStackTrace();} finally {try {if (null != connection)connection.close();} catch (Throwable ignore) {}}}public static void sendMessage(Session session, MessageProducer producer)throws Exception {for (int i = 1; i <= SEND_NUMBER; i++) {TextMessage message = session.createTextMessage("ActiveMq 發送的消息" + i);// 發送消息到目的地方System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);producer.send(message);}}
}
?
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的ActiveMQ的几种集群配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java实现二分查找-两种方式
- 下一篇: 设计模式:装饰模式(Decorator)