ActiveMQ集群
1.ActiveMQ集群介紹
1.為什么要集群?
實現高可用,以排除單點故障引起的服務中斷
實現負載均衡,以提升效率為更多客戶提供服務
2.集群方式
客戶端集群:讓多個消費者消費同一個隊列
Broker Cluster:多個Broker之間同步消息(做不了高可用,可以實現負載均衡)
Master-Slave:高可用(做不了負載均衡)
3.ActiveMq失效轉移
允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接其他消息服務器。
語法:failover:(uri1,...uriN)?transportOptions
transportOptions參數說明:randomize:默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略
initialReconnectDelay默認為10,默認10毫秒,表示第一次嘗試重連之間等待的時間。
maxReconnectDelay:默認30000,單位毫秒,最長重連時間的間隔
?4.Master/Slave集群配置
Share nothing storage master/slave(已經過時,5.8之后移除)
shared storage master/slave 共享存儲(實際上是共享同一文件夾,只不過采用排他鎖,所以只有一個master節點可以訪問,當此服務宕機,另一臺slave可以快速強占排他鎖,所以不會造成數據丟失,使用同一文件夾下的東西。如果多態服務器的話需要搭建文件共享服務器)
Replicated LevelDB Store 基于復制的LevelDB Store
1.共享存儲原理:(獲取排他鎖才可以提供消息服務)--簡單方式
?2.Replicated LevelDB Store 基于復制的LevelDB Store原理(基于zookeper)
3.兩種方式對比:
| ? | 高可用 | 負載均衡 |
| Master/Slave | 是 | 否 |
| Broker Cluster | 否 | 是 |
?
4.三臺機器的完美集群方案:(實現高可用和負載均衡)
?
2.ActiveMQ集群配置
1.方案介紹
?方案如下: B與C采用master-slave共享文件夾存儲(任一時刻只有一個可以占有排他鎖,也就是只有一個可以提供服務),當A宕機之后B會獲取資源鎖提供服務---實現高可用
? ?A與B、A與C都采用Broker 集群,可以同時提供服務,不管B與C誰獲取鎖,A都可以提供服務(A只能提供服務消費消息,不生成消息)---實現負載均衡
?
配置如下:(同一個電腦不同端口模擬集群)
?
?2.配置文件?(修改的配置文件都是在apache-activemq-5.15.6\conf文件夾下)
?
?
(1)activemq-a下面的配置:---A服務器
activemq.xml:(注釋掉其他協議,服務端口使用61616端口;增加靜態網絡連接器,同時連接B與C)
?jetty.xml:? 采用默認的8161端口
?
(2)activemq-b下面的配置:---B服務器
?activemq.xml:(增加與A的靜態連接器,修改服務端口采用61617,修改共享文件夾的地址)
?
?
?
jitty.xml:修改端口采用8162
?
?
(3)activemq-c下面的配置:---C服務器
?activemq.xml:(增加與A的靜態連接器,修改服務端口采用61618,修改共享文件夾的地址)
?
?
jitty.xml:修改端口采用8163
?
3.依次啟動ActiveMq進行測試?
啟動順序是:A->B->C,下面通過端口信息驗證集群:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2423756TCP [::]:8162 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP 127.0.0.1:55338 127.0.0.1:61616 ESTABLISHED 2423756TCP 127.0.0.1:61616 127.0.0.1:55338 ESTABLISHED 2423496TCP [::]:61616 [::]:0 LISTENING 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2423756TCP 127.0.0.1:55345 127.0.0.1:61617 ESTABLISHED 2423496TCP 127.0.0.1:61617 127.0.0.1:55345 ESTABLISHED 2423756TCP [::]:61617 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618由于先啟動的B,B占有排斥鎖,所以B(8162-61617)處于監聽狀態,而C與B采用共享文件排他鎖集群,所以C處于阻塞狀態,也就是沒有監聽端口,被阻塞。
?
停掉B服務器,模擬B服務器宕機的情況,再次查看端口:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2423848TCP [::]:8163 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP 127.0.0.1:56704 127.0.0.1:61616 ESTABLISHED 2423848TCP 127.0.0.1:61616 127.0.0.1:56704 ESTABLISHED 2423496TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618TCP 0.0.0.0:61618 0.0.0.0:0 LISTENING 2423848TCP 127.0.0.1:56791 127.0.0.1:61618 ESTABLISHED 2423496TCP 127.0.0.1:61618 127.0.0.1:56791 ESTABLISHED 2423848TCP [::]:61618 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop由于B宕機,所以C會強占排他鎖,也就是會監聽端口,所以可以看到8163與61618端口的監聽狀態。
?
現在模擬C也宕機,將C服務器也停掉。再次查看端口信息:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 127.0.0.1:57242 127.0.0.1:61617 SYN_SENT 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop?
此時A服務器仍然在監聽端口。
上面證明集群搭建成功,下面重新開啟A->B->C服務器開始程序驗證。
4.程序驗證集群
服務器開啟順序性:A->B->C(此時B占有排他鎖,B監聽端口,A一直監聽)
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2428232TCP 127.0.0.1:60520 127.0.0.1:61616 ESTABLISHED 2427192TCP 127.0.0.1:61616 127.0.0.1:60520 ESTABLISHED 2428232TCP [::]:61616 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2427192TCP 127.0.0.1:60513 127.0.0.1:61617 ESTABLISHED 2428232TCP 127.0.0.1:61617 127.0.0.1:60513 ESTABLISHED 2427192TCP [::]:61617 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2428232TCP [::]:8161 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2427192TCP [::]:8162 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163?
代碼測試:
生產者:? url加了失效策略,而且采用隨機選取,生產消息的地址只有B與C服務器地址
package cn.qlq.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 生產消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:04:41*/ public class MsgProducer {private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1創建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory創建connectionConnection connection = connectionFactory.createConnection();// 3.啟動connection connection.start();// 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.創建Destination(Queue繼承Queue)Queue destination = session.createQueue(queueName);// 6.創建生產者producerMessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {// 7.創建Message,有好多類型,這里用最簡單的TextMessageTextMessage tms = session.createTextMessage("textMessage:" + i);// 8.生產者發送消息 producer.send(tms);System.out.println("send:" + tms.getText());}// 9.關閉connection connection.close();}}?
消費者:url加了失效策略,而且采用隨機選取,生產消息的地址有ABC服務器
package cn.qlq.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 消費消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:26:41*/ public class MsgConsumer {private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1創建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory創建connectionConnection connection = connectionFactory.createConnection();// 3.啟動connection connection.start();// 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.創建Destination(Queue繼承Queue)Queue destination = session.createQueue(queueName);// 6.創建消費者consumerMessageConsumer consumer = session.createConsumer(destination);// 7.給消費者綁定監聽器(消息的監聽是一個異步的過程,不可以關閉連接,綁定監聽器線程是一直開啟的,處于阻塞狀態,所以可以在程序退出關閉)consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// 7.1由于消費者接受的是TextMessage,所以強轉一下TextMessage tms = (TextMessage) message;try {System.out.println("接收消息:" + tms.getText());} catch (JMSException e) {e.printStackTrace();}}});}}?
啟動生產者發布100條消息:
管理界面查看:(消息被發布在B服務器,B占有排他鎖,C處于阻塞)
?
?A服務不生產消息所有A不會消費消息:(通過A查看網絡連接器)
?
關掉B服務器之后查看C服務器:(驗證B與C共享同一文件夾,且強占排他鎖)
?
查看共享文件夾:
?
?此時A與C處于監聽狀態,啟動消費者:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2431676TCP [::]:8161 [::]:0 LISTENING 2431676 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2432188TCP [::]:8163 [::]:0 LISTENING 2432188TCP [::1]:8163 [::1]:63600 TIME_WAIT 0TCP [::1]:8163 [::1]:63648 TIME_WAIT 0TCP [::1]:8163 [::1]:63649 TIME_WAIT 0TCP [::1]:8163 [::1]:63650 TIME_WAIT 0TCP [::1]:8163 [::1]:63651 TIME_WAIT 0TCP [::1]:8163 [::1]:63652 TIME_WAIT 0?
查看控制臺如下:連接到A服務器消費信息
?
?總結:上面的配置方案B與C是為了實現高可用,也就是一臺宕機之后另一臺馬上強占排他鎖提供服務(需要共享文件夾實現共用同一文件夾下的資源與鎖),B與C可以生產消息,也可以提供消費消息,但是同一時刻只有一個提供服務。
? ?提供A是為了與B、C實現負載均衡,A不生產消息,但是可以消費消息,替B或者C分擔壓力。
?
轉載于:https://www.cnblogs.com/qlqwjy/p/9728425.html
總結
以上是生活随笔為你收集整理的ActiveMQ集群的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows安装及使用sqlmap
- 下一篇: JS 动态创建元素、删除元素、替换元素、