ActiveMQ broker 集群, 静态发现和动态发现
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ broker 集群, 静态发现和动态发现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
下載 activemq 壓縮包解壓后,conf 目錄下有各種示例配置文件,紅線標出的是靜態發現和動態發現的配置。
?
1. 靜態配置
啟動3個 broker,端口分別為61616,61618,61620,配置如下:
<networkConnectors></networkConnectors> <transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616"/> </transportConnectors> <networkConnectors><networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/> </networkConnectors> <transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61618"/> </transportConnectors> <networkConnectors><networkConnector uri="static:(tcp://localhost:61616,tcp://localhost:61618)" duplex="true"/> </networkConnectors> <transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61620"/> </transportConnectors>3個 broker 組成了一張網,當 producer 發送消息給 broker:61616 后,broker:61618 的消費者可以收到該消息。消息從 broker:61616 流動到 broker:61618,底層原理是 broker:61618 是 broker:61616 的一個消費者。
2. 動態配置
同樣地, 啟動3個 broker,端口分別為61616,61618,61620,配置如下:
<networkConnectors><networkConnector uri="multicast://default"/> </networkConnectors><transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" /> </transportConnectors> <networkConnectors><networkConnector uri="multicast://default"/> </networkConnectors> <transportConnectors><transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" /> </transportConnectors> <networkConnectors><networkConnector uri="multicast://default"/> </networkConnectors> <transportConnectors><transportConnector name="openwire" uri="tcp://0.0.0.0:61620" discoveryUri="multicast://default" /> </transportConnectors>使用多播協議,把 3 個 broker 動態地組成了一張網。
// 省略其他代碼 public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";public static final String DEFAULT_HOST_STR = "default"; public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3"); public static final int DEFAULT_PORT = 6155; private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);private static final String TYPE_SUFFIX = "ActiveMQ-4.";private static final String ALIVE = "alive.";private static final String DEAD = "dead.";private static final String DELIMITER = "%";private static final int BUFF_SIZE = 8192;private static final int DEFAULT_IDLE_TIME = 500;private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;public void run() {byte[] buf = new byte[BUFF_SIZE];DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);while (started.get()) {// 發送多播數據 doTimeKeepingServices();try {// 接收多播數據 mcast.receive(packet);if (packet.getLength() > 0) {String str = new String(packet.getData(), packet.getOffset(), packet.getLength());processData(str);}} catch (SocketTimeoutException se) {// ignore} catch (IOException e) {if (started.get()) {LOG.error("failed to process packet: " + e);}}}} }?
3. broker 集群的原理
如上圖,61616和61618組成集群,當61618加入一個consumer時,61618向61616發送一條ConsumerInfo消息,這樣61618就成為了61616的consumer。
ConsumerInfo 示例:
ConsumerInfo {commandId = 4, responseRequired = false, consumerId = dynamic-broker1->dynamic-broker2-1872-1524494145961-2:1:1:1, destination = queue://TEST.BAT, prefetchSize = 1, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = ID:USER-20140617MT-1882-1524494166015-0:1, subscriptionName = null, noLocal = false, exclusive = true, retroactive = false, priority = -5, brokerPath = [ID:USER-20140617MT-1877-1524494148767-0:1], optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = org.apache.activemq.command.NetworkBridgeFilter@413249b }?
轉載于:https://www.cnblogs.com/allenwas3/p/8872822.html
總結
以上是生活随笔為你收集整理的ActiveMQ broker 集群, 静态发现和动态发现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: poj2586 Y2K Accou
- 下一篇: Jenkins远程命令执行漏洞(CVE-