RocketMQ入门到入土(一)新手也能看懂的原理和实战!
點擊上方?好好學java?,選擇?星標?公眾號
重磅資訊、干貨,第一時間送達今日推薦:硬剛一周,3W字總結,一年的經驗告訴你如何準備校招!
個人原創100W+訪問量博客:點擊前往,查看更多學任何技術都是兩步驟:
搭建環境
helloworld
我也不例外,直接搞起來。
一、RocketMQ的安裝
1、文檔
官方網站
http://rocketmq.apache.org
GitHub
https://github.com/apache/rocketmq
2、下載
wget?https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip我們是基于Centos8來的,面向官方文檔學習,所以下載地址自然也是官方的。
去官方網站找合適的版本進行下載,目前我這里最新的是4.7.0版本。
http://rocketmq.apache.org/dowloading/releases/
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip
3、準備工作
3.1、解壓
unzip?rocketmq-all-4.7.0-bin-release.zip3.2、安裝jdk
sudo?yum?install?java-1.8.0-openjdk-devel4、啟動
4.1、啟動namesrv
cd?rocketmq-all-4.7.0-bin-release/bin ./mqnamesrv4.2、啟動broker
cd?rocketmq-all-4.7.0-bin-release/bin ./mqbroker?-n?localhost:9876常見錯誤以及解決方案:
常見錯誤:啟動broker失敗?Cannot allocate memory
[root@node-113b?bin]#?./mqbroker?-n?localhost:9876 Java?HotSpot(TM)?64-Bit?Server?VM?warning:?INFO:?os::commit_memory(0x00000005c0000000,?8589934592,?0)?failed ;?error='Cannot?allocate?memory'?(errno=12)# #?There?is?insufficient?memory?for?the?Java?Runtime?Environment?to?continue. #?Native?memory?allocation?(mmap)?failed?to?map?8589934592?bytes?for?committing?reserved?memory. #?An?error?report?file?with?more?information?is?saved?as: #?/usr/local/rocketmq/bin/hs_err_pid1997.log解決方案:
是由于默認內存分配的太大了,超出了本機內存,直接OOM了。
修改bin/目錄下的如下兩個腳本
runbroker.sh runserver.sh在這兩個腳本里都搜索-server -Xms,將其內存分配小點,自己玩的話512MB就足夠了,夠夠的了!
4.3、啟動成功標識
namesrv啟動成功標識:
broker啟動成功標識:
二、RocketMQ控制臺的安裝
控制臺目前獲取方式有如下兩種:
第三方網站去下載現成的,比如csdn等。
官方源碼包自己編譯而成,官方沒有現成的。
我們這里當然采取官方方式。
1、官方文檔
github倉庫
https://github.com/apache/rocketmq-externals
中文指南
https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md
2、下載源碼
https://codeload.github.com/apache/rocketmq-externals/zip/master
3、修改配置(可選)
我們下載完解壓后的文件目錄如下:
修改rocketmq-console\src\main\resources\application.properties文件的server.port就歐了。默認8080。
4、編譯打包
進入rocketmq-console,然后用maven進行編譯打包
mvn?clean?package?-DskipTests打包完會在target下生成我們spring boot的jar程序,直接java -jar啟動完事。
5、啟動控制臺
將編譯打包好的springboot程序扔到服務器上,執行如下命令進行啟動
java?-jar?rocketmq-console-ng-1.0.1.jar?--rocketmq.config.namesrvAddr=127.0.0.1:9876如果想后臺啟動就nohup &
訪問一下看看效果:
三、測試
rocketmq給我們提供了測試工具和測試類,可以在安裝完很方便的進行測試。
0、準備工作
rocketmq給我們提供的默認測試工具在bin目錄下,叫tools.sh。我們測試前需要配置這個腳本,為他指定namesrv地址才可以,否則測試發送/消費消息的時候會出現如下錯誤?connect to null failed:
22:49:02.470?[main]?DEBUG?i.n.u.i.l.InternalLoggerFactory?-?Using?SLF4J?as?the?default?logging?framework RocketMQLog:WARN?No?appenders?could?be?found?for?logger?(io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN?Please?initialize?the?logger?system?properly. java.lang.IllegalStateException:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?null?failed配置如下:
vim?tools.sh #?在export?JAVA_HOME上面添加如下這段代碼 export?NAMESRV_ADDR=localhost:98761、發送消息
./tools.sh?org.apache.rocketmq.example.quickstart.Producer成功的話會看到嘩嘩嘩的日志,因為這個類會發送1000條消息到TopicTest這個Topic下。
2、消費消息
./tools.sh?org.apache.rocketmq.example.quickstart.Consumer成功的話會看到嘩嘩嘩的日志,因為這個類會消費TopicTest下的全部消息。剛發送的1000條都會被消費掉。
3、控制臺
發送成功后我們自然也能來到管控臺去看消息和消費情況等等等信息
四、架構圖以及角色
1、架構圖
2、角色
2.1、Broker
-
理解成RocketMQ本身
-
broker主要用于producer和consumer接收和發送消息
-
broker會定時向nameserver提交自己的信息
-
是消息中間件的消息存儲、轉發服務器
-
每個Broker節點,在啟動時,都會遍歷NameServer列表,與每個NameServer建立長連接,注冊自己的信息,之后定時上報
2.2、Nameserver
-
理解成zookeeper的效果,只是他沒用zk,而是自己寫了個nameserver來替代zk
-
底層由netty實現,提供了路由管理、服務注冊、服務發現的功能,是一個無狀態節點
-
nameserver是服務發現者,集群中各個角色(producer、broker、consumer等)都需要定時向nameserver上報自己的狀態,以便互相發現彼此,超時不上報的話,nameserver會把它從列表中剔除
-
nameserver可以部署多個,當多個nameserver存在的時候,其他角色同時向他們上報信息,以保證高可用,
-
NameServer集群間互不通信,沒有主備的概念
-
nameserver內存式存儲,nameserver中的broker、topic等信息默認不會持久化,所以他是無狀態節點
2.3、Producer
-
消息的生產者
-
隨機選擇其中一個NameServer節點建立長連接,獲得Topic路由信息(包括topic下的queue,這些queue分布在哪些broker上等等)
-
接下來向提供topic服務的master建立長連接(因為rocketmq只有master才能寫消息),且定時向master發送心跳
2.4、Consumer
-
消息的消費者
-
通過NameServer集群獲得Topic的路由信息,連接到對應的Broker上消費消息
-
由于Master和Slave都可以讀取消息,因此Consumer會與Master和Slave都建立連接進行消費消息
3、核心流程
-
Broker都注冊到Nameserver上
-
Producer發消息的時候會從Nameserver上獲取發消息的topic信息
-
Producer向提供服務的所有master建立長連接,且定時向master發送心跳
-
Consumer通過NameServer集群獲得Topic的路由信息
-
Consumer會與所有的Master和所有的Slave都建立連接進行監聽新消息
五、核心概念
1、Message
消息載體。Message發送或者消費的時候必須指定Topic。Message有一個可選的Tag項用于過濾消息,還可以添加額外的鍵值對。
2、topic
消息的邏輯分類,發消息之前必須要指定一個topic才能發,就是將這條消息發送到這個topic上。消費消息的時候指定這個topic進行消費。就是邏輯分類。
3、queue
1個Topic會被分為N個Queue,數量是可配置的。message本身其實是存儲到queue上的,消費者消費的也是queue上的消息。多說一嘴,比如1個topic4個queue,有5個Consumer都在消費這個topic,那么會有一個consumer浪費掉了,因為負載均衡策略,每個consumer消費1個queue,5>4,溢出1個,這個會不工作。
4、Tag
Tag 是 Topic 的進一步細分,顧名思義,標簽。每個發送的時候消息都能打tag,消費的時候可以根據tag進行過濾,選擇性消費。
5、Message Model
消息模型:集群(Clustering)和廣播(Broadcasting)
6、Message Order
消息順序:順序(Orderly)和并發(Concurrently)
7、Producer Group
消息生產者組
8、Consumer Group
消息消費者組
六、ACK
首先要明確一點:ACK機制是發生在Consumer端的,不是在Producer端的。也就是說Consumer消費完消息后要進行ACK確認,如果未確認則代表是消費失敗,這時候Broker會進行重試策略(僅集群模式會重試)。ACK的意思就是:Consumer說:ok,我消費成功了。這條消息給我標記成已消費吧。
七、消費模式
1、集群模式(Clustering)
1.1、圖解
1.2、特點
-
每條消息只需要被處理一次,broker只會把消息發送給消費集群中的一個消費者
-
在消息重投時,不能保證路由到同一臺機器上
-
消費狀態由broker維護
2、廣播模式(Broadcasting)
2.1、圖解
2.2、特點
-
消費進度由consumer維護
-
保證每個消費者都消費一次消息
-
消費失敗的消息不會重投
八、Java API
說明:
-
RocketMQ服務端版本為目前最新版:4.7.0
-
Java客戶端版本采取的目前最新版:4.7.0
pom如下
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version> </dependency>1、Producer
發消息肯定要必備如下幾個條件:
-
指定生產組名(不能用默認的,會報錯)
-
配置namesrv地址(必須)
-
指定topic name(必須)
-
指定tag/key(可選)
驗證消息是否發送成功:消息發送完后可以啟動消費者進行消費,也可以去管控臺上看消息是否存在。
1.1、send(同步)
public?class?Producer?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello worldMessage?msg?=?new?Message("myTopic001",?"hello?world".getBytes());//?發送消息到mq,同步的SendResult?result?=?producer.send(msg);System.out.println("發送消息成功!result is :?"?+?result);//?關閉Producerproducer.shutdown();System.out.println("生產者 shutdown!");} }輸出結果:
發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854140F418B4AAC26F7973910000, offsetMsgId=7B39B49D00002A9F00000000000589BE,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=0],?queueOffset=7] 生產者 shutdown!1.2、send(批量)
public?class?ProducerMultiMsg?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();String?topic?=?"myTopic001";//?創建消息對象,topic為:myTopic001,消息內容為:hello world1/2/3Message?msg1?=?new?Message(topic,?"hello?world1".getBytes());Message?msg2?=?new?Message(topic,?"hello?world2".getBytes());Message?msg3?=?new?Message(topic,?"hello?world3".getBytes());//?創建消息對象的集合,用于批量發送List<Message>?msgs?=?new?ArrayList<>();msgs.add(msg1);msgs.add(msg2);msgs.add(msg3);//?批量發送的api的也是send(),只是他的重載方法支持List<Message>,同樣是同步發送。SendResult?result?=?producer.send(msgs);System.out.println("發送消息成功!result is :?"?+?result);//?關閉Producerproducer.shutdown();System.out.println("生產者 shutdown!");} }輸出結果:
發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854139C418B4AAC26F7D13770000,A9FE854139C418B4AAC26F7D13770001,A9FE854139C418B4AAC26F7D13770002, offsetMsgId=7B39B49D00002A9F0000000000058A62,7B39B49D00002A9F0000000000058B07,7B39B49D00002A9F0000000000058BAC,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=0],?queueOffset=8] 生產者 shutdown!從結果中可以看到只有一個msgId,所以可以發現雖然是三條消息對象,但是卻只發送了一次,大大節省了client與server的開銷。
錯誤情況:
批量發送的topic必須是同一個,如果message對象指定不同的topic,那么批量發送的時候會報錯:
Exception?in?thread?"main"?org.apache.rocketmq.client.exception.MQClientException:?Failed?to?initiate?the?MessageBatch For?more?information,?please?visit?the?url,?http://rocketmq.apache.org/docs/faq/at?org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:950)at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:898)at?com.chentongwei.mq.rocketmq.ProducerMultiMsg.main(ProducerMultiMsg.java:29) Caused?by:?java.lang.UnsupportedOperationException:?The?topic?of?the?messages?in?one?batch?should?be?the?sameat?org.apache.rocketmq.common.message.MessageBatch.generateFromList(MessageBatch.java:58)at?org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:942)...?2?more1.3、sendCallBack(異步)
public?class?ProducerASync?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world asyncMessage?msg?=?new?Message("myTopic001",?"hello?world?async".getBytes());//?進行異步發送,通過SendCallback接口來得知發送的結果producer.send(msg,?new?SendCallback()?{//?發送成功的回調接口@Overridepublic?void?onSuccess(SendResult?sendResult)?{System.out.println("發送消息成功!result is :?"?+?sendResult);}//?發送失敗的回調接口@Overridepublic?void?onException(Throwable?throwable)?{throwable.printStackTrace();System.out.println("發送消息失敗!result is :?"?+?throwable.getMessage());}});producer.shutdown();System.out.println("生產者 shutdown!");} }輸出結果:
生產者 shutdown! java.lang.IllegalStateException:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?[124.57.180.156:9876]?failedat?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:681)at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:511)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:692)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:556)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:97)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$4.run(DefaultMQProducerImpl.java:510)at?java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at?java.util.concurrent.FutureTask.run(FutureTask.java:266)at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at?java.lang.Thread.run(Thread.java:745) Caused?by:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?[124.57.180.156:9876]?failedat?org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:441)at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:396)at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:365)at?org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1371)at?org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1361)at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:624)...?10?more 發送消息失敗!result is : org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [124.57.180.156:9876]?failed為啥報錯了?很簡單,他是異步的,從結果就能看出來,由于是異步的,我還沒發送到mq呢,你就先給我shutdown了。肯定不行,所以我們在shutdown前面sleep 1s在看效果
public?class?ProducerASync?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world asyncMessage?msg?=?new?Message("myTopic001",?"hello?world?async".getBytes());//?進行異步發送,通過SendCallback接口來得知發送的結果producer.send(msg,?new?SendCallback()?{//?發送成功的回調接口@Overridepublic?void?onSuccess(SendResult?sendResult)?{System.out.println("發送消息成功!result is :?"?+?sendResult);}//?發送失敗的回調接口@Overridepublic?void?onException(Throwable?throwable)?{throwable.printStackTrace();System.out.println("發送消息失敗!result is :?"?+?throwable.getMessage());}});Thread.sleep(1000);producer.shutdown();System.out.println("生產者 shutdown!");} }輸出結果:
發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854106E418B4AAC26F8719B20000, offsetMsgId=7B39B49D00002A9F0000000000058CFC,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=1],?queueOffset=2] 生產者 shutdown!1.4、sendOneway
public?class?ProducerOneWay?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world onewayMessage?msg?=?new?Message("myTopic001",?"hello?world?oneway".getBytes());//?效率最高,因為oneway不關心是否發送成功,我就投遞一下我就不管了。所以返回是voidproducer.sendOneway(msg);System.out.println("投遞消息成功!,注意這里是投遞成功,而不是發送消息成功哦!因為我sendOneway也不知道到底成沒成功,我沒返回值的。");producer.shutdown();System.out.println("生產者 shutdown!");} }輸出結果:
投遞消息成功!,注意這里是投遞成功,而不是發送消息成功哦!因為我sendOneway也不知道到底成沒成功,我沒返回值的。 生產者 shutdown!1.5、效率對比
sendOneway > sendCallBack > send批量 > send單條
很容易理解,sendOneway不求結果,我就負責投遞,我不管你失敗還是成功,相當于中轉站,來了我就扔出去,我不進行任何其他處理。所以最快。
而sendCallBack是異步發送肯定比同步的效率高。
send批量和send單條的效率也是分情況的,如果只有1條msg要發,那還搞毛批量,直接send單條完事。
2、Consumer
每個consumer只能關注一個topic。
發消息肯定要必備如下幾個條件:
-
指定消費組名(不能用默認的,會報錯)
-
配置namesrv地址(必須)
-
指定topic name(必須)
-
指定tag/key(可選)
2.1、CLUSTERING
集群模式,默認。
比如啟動五個Consumer,Producer生產一條消息后,Broker會選擇五個Consumer中的其中一個進行消費這條消息,所以他屬于點對點消費模式。
public?class?Consumer?{public?static?void?main(String[]?args)?throws?Exception?{//?指定消費組名為my-consumerDefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("my-consumer");//?配置namesrv地址consumer.setNamesrvAddr("124.57.180.156:9876");//?訂閱topic:myTopic001 下的全部消息(因為是*,*指定的是tag標簽,代表全部消息,不進行任何過濾)consumer.subscribe("myTopic001",?"*");//?注冊監聽器,進行消息消息。consumer.registerMessageListener(new?MessageListenerConcurrently()?{@Overridepublic?ConsumeConcurrentlyStatus?consumeMessage(List<MessageExt>?msgs,?ConsumeConcurrentlyContext?consumeConcurrentlyContext)?{for?(MessageExt?msg?:?msgs)?{String?str?=?new?String(msg.getBody());//?輸出消息內容System.out.println(str);}//?默認情況下,這條消息只會被一個consumer消費,這叫點對點消費模式。也就是集群模式。//?ack確認return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//?啟動消費者consumer.start();System.out.println("Consumer?start");} }2.2、BROADCASTING
廣播模式。
比如啟動五個Consumer,Producer生產一條消息后,Broker會把這條消息廣播到五個Consumer中,這五個Consumer分別消費一次,每個都消費一次。
//?代碼里只需要添加如下這句話即可: consumer.setMessageModel(MessageModel.BROADCASTING);?2.3、兩種模式對比
-
集群默認是默認的,廣播模式是需要手動配置。
-
一條消息:集群模式下的多個Consumer只會有一個Consumer消費。廣播模式下的每一個Consumer都會消費這條消息。
-
廣播模式下,發送一條消息后,會被當前被廣播的所有Consumer消費,但是后面新加入的Consumer不會消費這條消息,很好理解:村里面大喇叭喊了全村來領雞蛋,第二天你們村新來個人,那個人肯定聽不到昨天大喇叭喊的消息呀。
3、TAG&&KEY
發送/消費 消息的時候可以指定tag/key來進行過濾消息,支持通配符。*代表消費此topic下的全部消息,不進行過濾。
看下org.apache.rocketmq.common.message.Message源碼可以發現發消息的時候可以指定tag和keys:
public?Message(String?topic,?String?tags,?String?keys,?byte[]?body)?{this(topic,?tags,?keys,?0,?body,?true); }比如:
public?class?ProducerTagsKeys?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world,且tags為:test-tags,keys為test-keysMessage?msg?=?new?Message("myTopic001",?"test-tags",?"test-keys",?"hello?world".getBytes());//?發送消息到mq,同步的SendResult?result?=?producer.send(msg);System.out.println("發送消息成功!result is :?"?+?result);//?關閉Producerproducer.shutdown();System.out.println("生產者 shutdown!");} }輸出結果:
發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854149DC18B4AAC26FA4B7200000, offsetMsgId=7B39B49D00002A9F0000000000058DA6,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=3],?queueOffset=3] 生產者 shutdown!查看管控臺,可以發現tags和keys已經生效了:
消費的時候如果指定*那就是此topic下的全部消息,我們可以指定前綴通配符,比如:
//?這樣就只會消費myTopic001下的tag為test-*開頭的消息。 consumer.subscribe("myTopic001",?"test-*");//?代表訂閱Topic為myTopic001下的tag為TagA或TagB的所有消息 consumer.subscribe("myTopic001",?"TagA||TagB");還支持SQL表達式過濾,不是很常用。不BB了。
4、常見錯誤
4.1、sendDefaultImpl call timeout
4.1.1、異常
Exception?in?thread?"main"?org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:?sendDefaultImpl?call?timeoutat?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:666)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)at?com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)4.1.2、解決
1.如果你是云服務器,首先檢查安全組是否允許9876這個端口訪問,是否開啟了防火墻,如果開啟了的話是否將9876映射了出去。
2.修改配置文件broker.conf,加上:
brokerIP1=我用的是阿里云服務器,這里是我的公網IP啟動namesrv和broker的時候加上本機IP(我用的是阿里云服務器,這里是我的公網IP):
./bin/mqnamesrv?-n?IP:9876 ./bin/mqbroker?-n?IP:9876?-c?conf/broker.conf4.2、No route info of this topic
4.2.1、異常
Exception?in?thread?"main"?org.apache.rocketmq.client.exception.MQClientException:?No?route?info?of?this?topic:?myTopic001 See?http://rocketmq.apache.org/docs/faq/?for?further?details.at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:684)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)at?com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)4.2.2、解決
很明顯發送成功了,不再是剛才的超時了,但是告訴我們沒有這個topic。那不能每次都手動創建呀,所以啟動broker的時候可以指定參數讓broker為我們自動創建。如下
./bin/mqbroker?-n?IP:9876?-c?conf/broker.conf?autoCreateTopicEnable=true 推薦文章-
硬剛一周,3W字總結,一年的經驗告訴你如何準備校招!
-
今年的校招,Java 好拿 offer 嗎?
-
10月了,該聊聊今年秋招了!
-
聊聊在騰訊實習快一個月的感受
總結
以上是生活随笔為你收集整理的RocketMQ入门到入土(一)新手也能看懂的原理和实战!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一款java代码生成器(我受够了加班),
- 下一篇: 「五大常用算法」一文图解分治算法和思想