RocketMQ的安装与配置
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ的安装与配置
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
2019獨角獸企業重金招聘Python工程師標準>>>
RocketMQ的安裝與配置
RocketMQ的安裝
下載地址:https://github.com/apache/rocketmq
? incubator-rocketmq git:(master) pwd /Users/xinxingegeya/workspace-github/incubator-rocketmq ? incubator-rocketmq git:(master)下載的是 rocketmq的master分支。
然后執行mvn install,并指定-P參數(相應的profile)
? incubator-rocketmq git:(master) mvn clean package install -Prelease-all -Dmaven.test.skip=true -U使用-U參數:該參數能強制讓Maven檢查所有SNAPSHOT依賴更新,確保集成基于最新的狀態,如果沒有該參數,Maven默認以天為單位檢查更新,而持續集成的頻率應該比這高很多。
進入distribution/target目錄,
? incubator-rocketmq git:(master) cd distribution/target/apache-rocketmq ? apache-rocketmq git:(master) ll total 56 -rw-r--r-- 1 xinxingegeya staff 524B 2 23 14:40 DISCLAIMER -rw-r--r-- 1 xinxingegeya staff 15K 6 8 10:35 LICENSE -rw-r--r-- 1 xinxingegeya staff 1.3K 6 8 10:35 NOTICE -rw-r--r-- 1 xinxingegeya staff 2.4K 6 8 10:35 README.md drwxr-xr-x 36 xinxingegeya staff 1.2K 6 13 18:27 bin drwxr-xr-x 10 xinxingegeya staff 340B 6 8 10:35 conf drwxr-xr-x 25 xinxingegeya staff 850B 6 13 18:27 lib運行Name Server
? apache-rocketmq-all git:(master) ? nohup sh bin/mqnamesrv & [1] 43055 appending output to nohup.out ? apache-rocketmq-all git:(master) ? tail -f ~/logs/rocketmqlogs/namesrv.log 2017-05-15 14:07:28 INFO main - serverOnewaySemaphoreValue=256 2017-05-15 14:07:28 INFO main - serverAsyncSemaphoreValue=64 2017-05-15 14:07:28 INFO main - serverChannelMaxIdleTimeSeconds=120 2017-05-15 14:07:28 INFO main - serverSocketSndBufSize=4096 2017-05-15 14:07:28 INFO main - serverSocketRcvBufSize=4096 2017-05-15 14:07:28 INFO main - serverPooledByteBufAllocatorEnable=true 2017-05-15 14:07:28 INFO main - useEpollNativeSelector=false 2017-05-15 14:07:28 INFO main - load KV config table OK 2017-05-15 14:07:28 INFO NettyEventExecuter - NettyEventExecuter service started 2017-05-15 14:07:28 INFO main - The Name Server boot success. serializeType=JSON運行Broker
? apache-rocketmq-all git:(master) ? nohup sh bin/mqbroker -n localhost:9876 & [2] 43087 appending output to nohup.out ? apache-rocketmq-all git:(master) ? tail -f ~/logs/rocketmqlogs/broker.log 2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FooBarGroup, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=TOOLS_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONS-HTTP-PROXY, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FILTERSRV_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONSAPI_PULL, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2017-05-15 14:08:25 INFO main - load /Users/xinxingegeya/store/config/subscriptionGroup.json OK 2017-05-15 14:08:25 INFO main - Set user specified name server address: localhost:9876 2017-05-15 14:08:25 INFO PullRequestHoldService - PullRequestHoldService service started 2017-05-15 14:08:26 INFO main - register broker to name server localhost:9876 OK 2017-05-15 14:08:26 INFO main - The broker[Yale-Li, 10.99.24.152:10911] boot success. serializeType=JSON and name server is localhost:9876 2017-05-15 14:08:35 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2017-05-15 14:08:35 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes 2017-05-15 14:08:36 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK關閉命令如下,
> sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK?
RocketMQ Console 的安裝
下載地址:https://github.com/apache/rocketmq-externals
mvn spring-boot:run執行該命令后,通過瀏覽器打開控制臺,默認端口是8080
?
簡單的 Producer 和 Consumer
生產者
package com.rocketmq.demo;import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/*** Created by xinxingegeya on 2017/5/13.*/ public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("mytest_producer_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 1000; i++) {try {// Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTest1" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one of brokers.SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}// Shut down once the producer instance is not longer in use.producer.shutdown();}}消費者
package com.rocketmq.demo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** Created by xinxingegeya on 2017/5/15.*/ public class Consumer {/*** push模式是由consumer把輪詢過程封裝.* 并注冊MessageListener監聽器,取到消息后調用MessageListener的consumeMessage()來消費,* 對用戶而言,感覺消息是被推送過來的.** @param args* @throws MQClientException*/public static void main(String args[]) throws MQClientException {/*** 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例* 注意:ConsumerGroupName需要由應用來保證唯一*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mytest_consumer_group_name");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setInstanceName("Consumer");/*** 訂閱指定topic下tags分別等于TagA或TagC或TagD*/consumer.subscribe("TopicTest1", "TagA || TagC || TagD");/*** 訂閱指定topic下所有消息* 注意:一個consumer對象可以訂閱多個topic*/consumer.subscribe("TopicTest2", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {/*** 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息*/public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());MessageExt msg = msgs.get(0);if (msg.getTopic().equals("TopicTest1")) {// 執行TopicTest1的消費邏輯if (msg.getTags() != null && msg.getTags().equals("TagA")) {// 執行TagA的消費System.out.println(new String(msg.getBody()));} else if (msg.getTags() != null && msg.getTags().equals("TagC")) {// 執行TagC的消費} else if (msg.getTags() != null && msg.getTags().equals("TagD")) {// 執行TagD的消費}} else if (msg.getTopic().equals("TopicTest2")) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer對象在使用之前必須要調用start初始化,初始化一次即可*/consumer.start();System.out.println("Consumer Started.");} }============END============
轉載于:https://my.oschina.net/xinxingegeya/blog/900162
總結
以上是生活随笔為你收集整理的RocketMQ的安装与配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ipv6链路本地地址ping不通
- 下一篇: RocketMQ源码解析:Filters