RocketMQ从入门到放弃
前言
RocketMQ是一個(gè)輕量級(jí)的數(shù)據(jù)處理平臺(tái),本篇主要講解RocketMQ的安裝、配置、基本使用等。
環(huán)境
以下是各個(gè)版本的RocketMQ對(duì)JDK版本的要求,本篇使用的是RocketMQ 4.6.1
(圖片來(lái)源:http://rocketmq.apache.org/dowloading/releases/)
概念
角色
RocketMQ中相關(guān)的角色大致可以分為四個(gè):NameServer、Broker、Producer和Consumer。
broker
broker是整個(gè)RocketMQ的核心
- Broker面向producer和consumer發(fā)送和接收消息
- 向Name Server提交自己的信息
- 是RocketMQ的存儲(chǔ)轉(zhuǎn)發(fā)服務(wù)器
- 每個(gè)Broker節(jié)點(diǎn)在啟動(dòng)時(shí)都會(huì)遍歷Name Server列表,與每個(gè)Name Server建立長(zhǎng)連接,注冊(cè)自己的信息,然后定時(shí)上報(bào)
如果broker搭建了集群
- Broker高可用,可以配置成Master/Slave結(jié)構(gòu),Master可讀可寫(xiě),Slave只讀,Master將寫(xiě)入的數(shù)據(jù)同步給Slave
- 一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,一個(gè)Slave只對(duì)應(yīng)一個(gè)Master
- Master與Slave的對(duì)應(yīng)關(guān)系通過(guò)指定相同的BrokerName,不同的BrokerId來(lái)定義,BrokerId為0表示Master,非0表示Slave
- Master多機(jī)負(fù)載,可以部署多個(gè)broker
- 每個(gè)broker與nameserver集群中所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè)Topic信息到所有NameServer
producer
- 消息生產(chǎn)者
- 通過(guò)集群中的一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長(zhǎng)連接,獲得Topic的路由信息,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上
- 接下來(lái)向提供Topic服務(wù)的Master建立長(zhǎng)連接,且定時(shí)向Master發(fā)送心跳
consumer
- 消息的消費(fèi)者
- 通過(guò)Name Server集群獲得Topic的路由信息,連接到對(duì)應(yīng)的Broker上進(jìn)行消費(fèi)
- Master和Slave都可以讀取消息,因此consumer會(huì)與Master和Slave都建立連接
Name Server
- 底層由Netty實(shí)現(xiàn),提供了路由管理,服務(wù)注冊(cè),服務(wù)發(fā)現(xiàn)的功能,是一個(gè)無(wú)狀態(tài)節(jié)點(diǎn)
- Name Server是服務(wù)發(fā)現(xiàn)者,集群中的各個(gè)角色(Broker、Producer、Consumer)都需要定時(shí)上報(bào)自己的狀態(tài),以便互相發(fā)現(xiàn)彼此,超時(shí)不上報(bào)的話,Name Server會(huì)把它從列表中剔除
- Name Server可以部署多個(gè),部署多個(gè)時(shí),其他角色同時(shí)向多個(gè)Name Server上報(bào)自己的狀態(tài),以保證高可用
- Name Server集群間互不通信,沒(méi)有主備概念
- Name Server采用內(nèi)存式存儲(chǔ),Broker、Topic等信息默認(rèn)不會(huì)持久化
安裝
## 下載 wget https://archive.apache.org/dist/rocketmq/4.6.1/rocketmq-all-4.6.1-bin-release.zip## 解壓 unzip rocketmq-all-4.6.1-bin-release.zip進(jìn)入解壓后的RocketMQ的bin目錄
[root@localhost rocketmq-all-4.6.1-bin-release]# cd bin/ [root@localhost bin]# ll 總用量 108 -rwxr-xr-x 1 root root 1654 11月 28 2019 cachedog.sh -rwxr-xr-x 1 root root 845 11月 28 2019 cleancache.sh -rwxr-xr-x 1 root root 1116 11月 28 2019 cleancache.v1.sh drwxr-xr-x 2 root root 25 11月 28 2019 dledger -rwxr-xr-x 1 root root 1398 11月 28 2019 mqadmin -rwxr-xr-x 1 root root 1029 11月 28 2019 mqadmin.cmd -rwxr-xr-x 1 root root 1394 11月 28 2019 mqbroker -rwxr-xr-x 1 root root 1084 11月 28 2019 mqbroker.cmd -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode0 -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode1 -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode2 -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode3 -rwxr-xr-x 1 root root 1396 11月 28 2019 mqnamesrv -rwxr-xr-x 1 root root 1088 11月 28 2019 mqnamesrv.cmd -rwxr-xr-x 1 root root 1571 11月 28 2019 mqshutdown -rwxr-xr-x 1 root root 1398 11月 28 2019 mqshutdown.cmd -rwxr-xr-x 1 root root 2222 11月 28 2019 os.sh -rwxr-xr-x 1 root root 1148 11月 28 2019 play.cmd -rwxr-xr-x 1 root root 1008 11月 28 2019 play.sh -rwxr-xr-x 1 root root 772 11月 28 2019 README.md -rwxr-xr-x 1 root root 2206 11月 28 2019 runbroker.cmd -rwxr-xr-x 1 root root 3713 12月 24 2019 runbroker.sh -rwxr-xr-x 1 root root 1816 11月 28 2019 runserver.cmd -rwxr-xr-x 1 root root 3397 12月 24 2019 runserver.sh -rwxr-xr-x 1 root root 1156 11月 28 2019 setcache.sh -rwxr-xr-x 1 root root 1408 11月 28 2019 startfsrv.sh -rwxr-xr-x 1 root root 1634 12月 24 2019 tools.cmd -rwxr-xr-x 1 root root 1901 12月 24 2019 tools.sh需要啟動(dòng)的是mqbroker和mqnamesrv兩個(gè)腳本。
首先啟動(dòng)mqnamesrv,可以看到如下日志
[root@localhost bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006c0000000, 2147483648, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory. # An error report file with more information is saved as: # /root/rocketmq-all-4.6.1-bin-release/bin/hs_err_pid2154.log服務(wù)啟動(dòng)失敗,根據(jù)日志可以看到,第一行是說(shuō)ParNew + CMS收集器的方式會(huì)在未來(lái)的版本中被移除,第二行日志是說(shuō)CMS收集器的參數(shù)UseCMSCompactAtFullCollection會(huì)在未來(lái)的版本中被移除。
但是這兩行日志都不是服務(wù)啟動(dòng)失敗的原因,失敗的原因在第三行error='Cannot allocate memory' (errno=12),是說(shuō)無(wú)法分配內(nèi)存,根據(jù)日志前半段可以知道,啟動(dòng)這個(gè)腳本需要分配2147483648byte(2G)的內(nèi)存,由于本篇使用的虛擬機(jī)只有1G內(nèi)存,所以啟動(dòng)失敗。
因此需要修改腳本啟動(dòng)參數(shù),查看腳本mqnamesrv
if [ -z "$ROCKETMQ_HOME" ] ; then## resolve links - $0 may be a link to maven's homePRG="$0"# need this for relative symlinkswhile [ -h "$PRG" ] ; dols=`ls -ld "$PRG"`link=`expr "$ls" : '.*-> \(.*\)$'`if expr "$link" : '/.*' > /dev/null; thenPRG="$link"elsePRG="`dirname "$PRG"`/$link"fidonesaveddir=`pwd`ROCKETMQ_HOME=`dirname "$PRG"`/..# make it fully qualifiedROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`cd "$saveddir" fiexport ROCKETMQ_HOMEsh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@可以看到腳本并未設(shè)置內(nèi)存參數(shù),但是最后一行可以看到,改腳本又執(zhí)行了另一個(gè)腳本runserver.sh,所以應(yīng)該編輯runserver.sh腳本,打開(kāi)runserver.sh腳本后,可以看到如下參數(shù)
把相關(guān)的參數(shù)改成合適的即可。
再次啟動(dòng)mqnamesrv可以看到如下日志
[root@localhost bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSONName Server已經(jīng)啟動(dòng)
同樣的,啟動(dòng)mqbroker也需要按照上述方式修改對(duì)應(yīng)的參數(shù),成功啟動(dòng)之后會(huì)出現(xiàn)如下日志
[root@localhost bin]# ./mqbroker -n 192.168.1.101:9876 autoCreateTopicEnable=true The broker[localhost.localdomain, 192.168.1.101:10911] boot success. serializeType=JSON and name server is 192.168.1.101:9876-n指定連接哪個(gè)Name Server,autoCreateTopicEnable=true指定當(dāng)Topic不存在時(shí),自動(dòng)創(chuàng)建。
測(cè)試
同步消息
這種方式發(fā)送的消息是可靠的消息,消息發(fā)送中進(jìn)入同步等待狀態(tài),可以保證消息投遞一定到達(dá)
Producer代碼
/*** @author sicimike* @create 2020-07-27 22:36*/ public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer);// 設(shè)置Name Serverproducer.setNamesrvAddr("192.168.1.101:9876");// 啟動(dòng)Producerproducer.start();// 消息對(duì)象Message message = new Message("hello-topic", "Hello RocketMQ".getBytes());// 發(fā)送同步消息SendResult sendResult = producer.send(message);System.out.println("sendResult is : " + sendResult);// 關(guān)閉連接producer.shutdown();} }Consumer代碼
/*** @author sicimike* @create 2020-07-27 22:37*/ public class SicimikeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello-group-consumer");// 設(shè)置Name Serverconsumer.setNamesrvAddr("192.168.1.101:9876");// 訂閱topic,第二個(gè)參數(shù)是消息過(guò)濾器,“*”表示不過(guò)濾consumer.subscribe("hello-topic", "*");// 注冊(cè)消息監(jiān)聽(tīng)器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {// 一次性可能收到多條消息System.out.println("message is : " + new String(messageExt.getBody()));}// 返回消息已經(jīng)被消費(fèi)return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 啟動(dòng)Consumerconsumer.start();} }批量消息
RocketMQ可以將多條消息打包一起發(fā)送,減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),以提高效率。
Producer代碼
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();Message message1 = new Message("hello-topic", "Hello RocketMQ-1".getBytes());Message message2 = new Message("hello-topic", "Hello RocketMQ-2".getBytes());Message message3 = new Message("hello-topic", "Hello RocketMQ-3".getBytes());List<Message> messageList = new ArrayList<>();messageList.add(message1);messageList.add(message2);messageList.add(message3);SendResult sendResult = producer.send(messageList);System.out.println("sendResult is : " + sendResult);producer.shutdown();} }發(fā)送批量消息有一定的限制
- 批量消息要求必要具有同一topic、相同消息配置
- 不支持延時(shí)消息
- 建議一個(gè)批量消息最好不要超過(guò)1MB大小
- 如果不確定是否超過(guò)限制,可以手動(dòng)計(jì)算大小分批發(fā)送
異步消息
異步消息通常用于響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景中。
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();// 異步消息發(fā)送失敗后重試幾次(不包括第一次發(fā)送的那次)producer.setRetryTimesWhenSendAsyncFailed(0);Message message = new Message("hello-topic", "Hello RocketMQ asynchronous".getBytes());producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 發(fā)送成功后回調(diào)System.out.println("sendResult is : " + sendResult);// 關(guān)閉連接producer.shutdown();}@Overridepublic void onException(Throwable e) {// 拋出異常后回調(diào)System.out.println("Exception : " + e);}});} }單向消息
單向消息用于要求中等可靠性的情況,例如日志收集。因?yàn)閱蜗蛳⒉淮_保消息能夠發(fā)送成功,所以速度較快
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();Message message = new Message("hello-topic", "Hello RocketMQ One way".getBytes());// 發(fā)送單向消息producer.sendOneway(message);} }廣播消息
與上面不同的是,是否是廣播消息是由消費(fèi)者來(lái)決定的
- 集群消費(fèi):Consumer集群中,同一個(gè)Group下,只有一個(gè)Consumer能夠消費(fèi)
- 廣播消費(fèi):訂閱了這個(gè)Topic的所有Consumer都能消費(fèi)
事務(wù)消息
事務(wù)消息可以將其視為兩階段提交消息實(shí)現(xiàn),以確保分布式系統(tǒng)中的最終一致性。事務(wù)性消息可確保本地事務(wù)的執(zhí)行和消息的發(fā)送可以原子方式執(zhí)行。
如圖所示,是RocketMQ發(fā)送事務(wù)消息時(shí)的流程,RocketMQ首先會(huì)發(fā)送Half Message到Broker的Half Queue,這種消息是不能被消費(fèi)的,發(fā)送成功后,再執(zhí)行本地事務(wù),如果本地事務(wù)正常提交,RockerMQ才會(huì)把Half Message轉(zhuǎn)換成正常的Message消息發(fā)送出去,如果本地事務(wù)回滾,則Half Message會(huì)被刪除。如果在超時(shí)時(shí)間之內(nèi),RocketMQ既沒(méi)有收到回滾,也沒(méi)有收到提交,則會(huì)定時(shí)檢查本地事務(wù)的狀態(tài),來(lái)確定消息是應(yīng)該被提交還是回滾。
順序消息
RocketMQ使用FIFO順序提供有序消息,順序消息分為全局有序和分區(qū)有序。
順序消息分為發(fā)送的時(shí)候有序和消費(fèi)的時(shí)候有序,只有保證了這兩個(gè)都有序,才是說(shuō)是順序消息。
因?yàn)镽ocketMQ中,每個(gè)Topic下面都有若干個(gè)Queue,而Queue是FIIFO的,所以想要消息的發(fā)送有序,只需要把消息發(fā)送到同一個(gè)Queue即可,RocketMQ提供了這樣的API
Producer代碼
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();for (int i = 0; i < 10; i++) {// 同一個(gè)Topic下Message message = new Message("hello-topic", ("Hello RocketMQ Order " + i).getBytes());// MessageQueueSelector是Queue的選擇器SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;// 根據(jù)外部參數(shù),選擇合適的Queuereturn mqs.get(id % mqs.size());}}, 1234);System.out.println("sendResult is : " + sendResult);}} }想要實(shí)現(xiàn)消費(fèi)者的順序消費(fèi)非常簡(jiǎn)單,RocketMQ也提供了這樣的API。
Consumer代碼
public class SicimikeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello-group-consumer");consumer.setNamesrvAddr("192.168.1.101:9876");consumer.subscribe("hello-topic", "*");// MessageListenerOrderly表示順序消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.println(Thread.currentThread().getName() + " -> " + new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();} }Consumer執(zhí)行結(jié)果
ConsumeMessageThread_1 -> Hello RocketMQ Order 0 ConsumeMessageThread_1 -> Hello RocketMQ Order 1 ConsumeMessageThread_1 -> Hello RocketMQ Order 2 ConsumeMessageThread_1 -> Hello RocketMQ Order 3 ConsumeMessageThread_1 -> Hello RocketMQ Order 4 ConsumeMessageThread_1 -> Hello RocketMQ Order 5 ConsumeMessageThread_1 -> Hello RocketMQ Order 6 ConsumeMessageThread_1 -> Hello RocketMQ Order 7 ConsumeMessageThread_1 -> Hello RocketMQ Order 8 ConsumeMessageThread_1 -> Hello RocketMQ Order 9MessageListenerOrderly會(huì)給每個(gè)Queue啟動(dòng)一個(gè)線程進(jìn)行消費(fèi),所以對(duì)于一個(gè)Queue中的消息是有序的。
以下實(shí)例可以驗(yàn)證,在Producer中開(kāi)啟兩個(gè)線程進(jìn)行寫(xiě)入,分別把消息寫(xiě)到不同的Queue。
public class SicimikeProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();new Thread(() -> {try {for (int i = 0; i < 10; i++) {Message message = new Message("hello-topic", ("Hello RocketMQ Order 1234-" + i).getBytes());SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size());}}, 1234);System.out.println(Thread.currentThread().getName() + " -> sendResult is : " + sendResult);}} catch (Exception e) {}}).start();new Thread(() -> {try {for (int i = 0; i < 10; i++) {Message message = new Message("hello-topic", ("Hello RocketMQ Order 4321-" + i).getBytes());SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size());}}, 4321);System.out.println(Thread.currentThread().getName() + " -> sendResult is : " + sendResult);}} catch (Exception e) {}}).start();} }執(zhí)行結(jié)果
Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0001, offsetMsgId=C0A8016500002A9F0000000000013EA1, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=115] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0000, offsetMsgId=C0A8016500002A9F0000000000013DBD, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=18] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0003, offsetMsgId=C0A8016500002A9F0000000000013F85, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=19] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0002, offsetMsgId=C0A8016500002A9F0000000000014069, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=116] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60004, offsetMsgId=C0A8016500002A9F000000000001414D, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=117] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60005, offsetMsgId=C0A8016500002A9F0000000000014231, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=20] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CB0006, offsetMsgId=C0A8016500002A9F0000000000014315, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=118] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CD0007, offsetMsgId=C0A8016500002A9F00000000000143F9, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=21] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D30008, offsetMsgId=C0A8016500002A9F00000000000144DD, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=119] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D50009, offsetMsgId=C0A8016500002A9F00000000000145C1, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=22] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D8000A, offsetMsgId=C0A8016500002A9F00000000000146A5, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=120] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D9000B, offsetMsgId=C0A8016500002A9F0000000000014789, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=23] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E0000C, offsetMsgId=C0A8016500002A9F000000000001486D, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=121] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E6000D, offsetMsgId=C0A8016500002A9F0000000000014951, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=24] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E8000E, offsetMsgId=C0A8016500002A9F0000000000014A35, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=122] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E9000F, offsetMsgId=C0A8016500002A9F0000000000014B19, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=25] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863ED0010, offsetMsgId=C0A8016500002A9F0000000000014BFD, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=123] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F40011, offsetMsgId=C0A8016500002A9F0000000000014CE1, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=26] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F50012, offsetMsgId=C0A8016500002A9F0000000000014DC5, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=124] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F80013, offsetMsgId=C0A8016500002A9F0000000000014EA9, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=27]根據(jù)輸出信息就可以看到,每個(gè)線程在寫(xiě)入自己的Queue時(shí)都是有序的。再來(lái)看看消費(fèi)者
public class SicimikeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello-group-consumer");consumer.setNamesrvAddr("192.168.1.101:9876");consumer.subscribe("hello-topic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.println(Thread.currentThread().getName() + " -> " + messageExt.toString());}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();} }執(zhí)行結(jié)果
ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=115, sysFlag=0, bornTimestamp=1596467602352, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602468, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000013EA1, commitLogOffset=81569, bodyCRC=194455642, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0001, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 48], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=18, sysFlag=0, bornTimestamp=1596467602352, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602467, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000013DBD, commitLogOffset=81341, bodyCRC=1481455063, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0000, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 48], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=19, sysFlag=0, bornTimestamp=1596467602365, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602482, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000013F85, commitLogOffset=81797, bodyCRC=793380161, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0003, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 49], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=116, sysFlag=0, bornTimestamp=1596467602365, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602482, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014069, commitLogOffset=82025, bodyCRC=2089818316, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0002, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 49], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=20, sysFlag=0, bornTimestamp=1596467602374, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602486, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014231, commitLogOffset=82481, bodyCRC=910382331, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60005, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 50], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=21, sysFlag=0, bornTimestamp=1596467602381, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602496, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000143F9, commitLogOffset=82937, bodyCRC=1095001197, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CD0007, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 51], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=117, sysFlag=0, bornTimestamp=1596467602374, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602486, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F000000000001414D, commitLogOffset=82253, bodyCRC=1704544630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60004, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 50], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=22, sysFlag=0, bornTimestamp=1596467602389, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602500, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000145C1, commitLogOffset=83393, bodyCRC=1595994574, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D50009, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 52], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=118, sysFlag=0, bornTimestamp=1596467602379, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602491, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014315, commitLogOffset=82709, bodyCRC=312375776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CB0006, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 51], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=23, sysFlag=0, bornTimestamp=1596467602393, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602506, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014789, commitLogOffset=83849, bodyCRC=673694040, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D9000B, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 53], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=119, sysFlag=0, bornTimestamp=1596467602387, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602498, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000144DD, commitLogOffset=83165, bodyCRC=217771075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D30008, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 52], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=24, sysFlag=0, bornTimestamp=1596467602406, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602517, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014951, commitLogOffset=84305, bodyCRC=825135330, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E6000D, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 54], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=120, sysFlag=0, bornTimestamp=1596467602392, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602506, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000146A5, commitLogOffset=83621, bodyCRC=2080234709, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D8000A, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 53], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=25, sysFlag=0, bornTimestamp=1596467602409, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602524, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014B19, commitLogOffset=84761, bodyCRC=1177133172, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E9000F, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 55], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=121, sysFlag=0, bornTimestamp=1596467602400, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602514, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F000000000001486D, commitLogOffset=84077, bodyCRC=1660194159, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E0000C, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 54], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=26, sysFlag=0, bornTimestamp=1596467602420, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602531, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014CE1, commitLogOffset=85217, bodyCRC=1452719589, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F40011, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 56], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=122, sysFlag=0, bornTimestamp=1596467602408, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602519, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014A35, commitLogOffset=84533, bodyCRC=368295417, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E8000E, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 55], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=27, sysFlag=0, bornTimestamp=1596467602424, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602535, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014EA9, commitLogOffset=85673, bodyCRC=563187059, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F80013, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 57], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=123, sysFlag=0, bornTimestamp=1596467602413, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602525, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014BFD, commitLogOffset=84989, bodyCRC=88907880, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863ED0010, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 56], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=124, sysFlag=0, bornTimestamp=1596467602421, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602532, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014DC5, commitLogOffset=85445, bodyCRC=1917554942, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F50012, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 57], transactionId='null'}]可以看到對(duì)于每一個(gè)Queue,都是是有序的。
總結(jié)
本篇主要講解RocketMQ的安裝、基本概念、基本使用方式。
總結(jié)
以上是生活随笔為你收集整理的RocketMQ从入门到放弃的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 单片机 spwm c语言程序,基于STC
- 下一篇: 如何订立合规的电子劳动合同 ——关于人社