志宇-RocketMQ学习
RocketMQ
- RocketMQ安裝
- RocketMQ-console安裝
- RocketMQ簡單使用
- RabbitMQ核心概念
- 消息發送狀態(返回對象中的枚舉類型有4種)
- 重試次數
- RocketMQ發送消息三種方式
- RocketMQ延遲消息設置
- 消息順序消費
- 消費端配置參數詳情
- tag標簽
- 消費模式
- Offset和CommitLog
- 事務使用
- 集群部署
RocketMQ安裝
官方安裝方法
先后安裝Name Server(起到路由功能)和 Broker(RocketMQ服務) 然后測試下發送和接收可用
內存不夠在/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin目錄下修改runbroker.sh tools.sh runserver.sh這三個文件中JAVA_OPT參數,修改完如下
#僅僅修改已經存在的配置即可,將4g換成256m或者128m #runbroker.sh broker占用的內存大小 #tools.sh 測試發送和接收工具的內存大小 #runserver.sh 路由的內存和大小 JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"RocketMQ-console安裝
找到rocketmq-console/src/main/resources/application.properties 修改配置
#console端口 server.port=17890 #name server地址 rocketmq.config.namesrvAddr=localhost:9876找到rocketmq-console目錄下的pom.xml文件,修改配置
<rocketmq.version>4.4.0XXX</rocketmq.version> 4.4.0XXX 修改為 4.4.0(你的RocketMQ版本) <rocketmq.version>4.4.0</rocketmq.version>進入rocketmq-console目錄,編譯打包 mvn clean package -Dmaven.test.skip=true
進入target目錄 ,啟動 java -jar rocketmq-console-ng-1.0.0.jar
守護進程方式啟動 nohup java -jar rocketmq-console-ng-1.0.0.jar &
RocketMQ簡單使用
常見錯誤一
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout解決辦法
因為服務器中可能有多塊網卡,rocketmq要指定公網ip
在配置文件rocketmq-all-4.8.0-source-release/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中添加
brokerIP1=39.107.109.94此ip為公網ip
然后重新啟動 ,啟動命令如下:
jps 找到啟動進程,kill -9 關閉進程,然后已守護進程啟動
nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
常見錯誤二
MQClientException: No route info of this topic, TopicTest1解決辦法
來到/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0目錄下
查看已經配置信息sh bin/mqbroker -m 配置信息如下
修改配置文件distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中autoCreateTopicEnable=true,true 則代表可以自動創建topic,生產上要關閉,如果這個參數是true還創建不了topic可能是因為程序引入jar包的版本和RabbitMQ版本不同導致的
常用配置說明
compressMsgBodyOverHowmuch :消息超過默認字節4096后進行壓縮 retryTimesWhenSendFailed : 失敗重發次數 maxMessageSize : 最大消息配置,默認128k topicQueueNums : 主題下面的隊列數量,默認是4 autoCreateTopicEnable : 是否自動創建主題Topic, 開發建議為true,生產要為false defaultTopicQueueNums : 自動創建服務器不存在的topic,默認創建的隊列數 autoCreateSubscriptionGroup: 是否允許 Broker 自動創建訂閱組,建議線下開發開啟,線上關閉 brokerClusterName : 集群名稱 brokerId : 0表示Master主節點 大于0表示從節點 brokerIP1 : Broker服務地址 brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE deleteWhen : 每天執行刪除過期文件的時間,默認每天凌晨4點 flushDiskType :刷盤策略, 默認為 ASYNC_FLUSH(異步刷盤), 另外是SYNC_FLUSH(同步刷盤) listenPort : Broker監聽的端口號 mapedFileSizeCommitLog : 單個conmmitlog文件大小,默認是1GB mapedFileSizeConsumeQueue:ConsumeQueue每個文件默認存30W條,可以根據項目調整 storePathRootDir : 存儲消息以及一些配置信息的根目錄 默認為用戶的 ${HOME}/store storePathCommitLog:commitlog存儲目錄默認為${storePathRootDir}/commitlog storePathIndex: 消息索引存儲路徑 syncFlushTimeout : 同步刷盤超時時間 diskMaxUsedSpaceRatio : 檢測可用的磁盤空間大小,超過后會寫入報錯(磁盤沒有滿卻寫入不進RabbitMQ是就有可能是因為這個參數,這個參數就是當磁盤還剩余百分之多少時就不允許在寫入消息了)常見錯誤三
控制臺查看不了數據,提示連接 10909錯誤,因為RocketMQ自帶VIP虛擬ip技術,這時要防火墻要開放10909端口,才能使用
RabbitMQ核心概念
消息發送狀態(返回對象中的枚舉類型有4種)
生產者發送完信息返回的類型(返回類型SendResult中的SendStatus成員變量)
FLUSH_DISK_TIMEOUT
在指定時間沒有將消息同步到磁盤中(刷盤策略需要為SYNC_FLUSH 才會出這個錯誤)
例如:在cpu爆滿時候導致沒有刷盤成功
FLUSH_SLAVE_TIMEOUT
主從模式下,broker是SYNC_MASTER, 沒有在規定時間內完成主從同步
例如:網路原因等,導致主從同步沒有成功,如果是異步復制則不會出現這個問題
SLAVE_NOT_AVAILABLE
從模式下,broker是SYNC_MASTER, 但是沒有找到被配置成Slave的Broker
例如:主從模式下,所有從節點宕機,如果是異步復制則不會出現這個問題
SEND_OK
發送成功,沒有發生上面的三種問題
重試次數
生產者重試:RocketMQ中默認次數是2次,一般不是跨國調用不用修改重試次數了
重試次數設置方法如下
異步發送不會重試,需自己書寫代碼重試
//這里會開啟一個線程異步發送消息 produceProxy.getProducet().send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {//處理消息}@Overridepublic void onException(Throwable e) {//進行重試} });消費者重試:默認次數16次,當網絡中斷、消費者報錯、ack確認失敗導致重試
手動設置重試次數方法如下
RocketMQ發送消息三種方式
同步發送: 發送驗證碼,郵件通知中使用,速快快,當前線程反饋,可靠
異步發送:注冊發送優惠券中使用,速快快,非前線程反饋,可靠
OneWay: 日志采集中使用,速度最快,沒有反饋,相對不可靠
RocketMQ延遲消息設置
開源版本不支持定時發送,只支持固定的發送時間
在源碼中rocketmq-store 項目> MessageStoreConfig.java >的成員變量 messageDelayLevel中有固定的延遲時間
發送消息時設定延遲等級
Message message = new Message("topicName","TagA","sendMessage"); //0等級代表不延遲 message.setDelayTimeLevel(0) //1代表延遲一秒鐘 message.setDelayTimeLevel(1) //2代表延遲5秒鐘 //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 以此類推 message.setDelayTimeLevel(2)消息順序消費
全局順序消費:例如比特幣交易過程中人民幣換成美元時候按照從低到高的順序
局部順序消費:例如 訂單狀態的消息提醒
局部順序的使用說明:
1、RocketMQ上默認一個topic上有4個queue,順序消費要將消息投遞到同一個topic對應的同一個隊列中(根據業務id取模然后投遞到同一個隊列上,通過MessageQueueSelector實現類實現)
2、順序消息暫不支持廣播模式、異步發送方式
3、順序消費支持多個消費者進行消費(消費者消費前會對消費的隊列加鎖)
4、消費者部署的節點數要小于此topic對應的queue的數量(消費數量會均等分給消費者,不然有的消費者收不到消息)
5、消費者單線程處理,使用MessageListenerOrderly替代MessageListenerConcurrently
發送消息偽代碼如下
接收消息偽代碼如下
//這里使用MessageListenerOrderly替代MessageListenerConcurrently(使用單線程消費) consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {MessageExt messageExt = msgs.get(0);String body = new String(messageExt.getBody(), "utf-8");System.out.println(body);return ConsumeOrderlyStatus.SUCCESS;}} );消費端配置參數詳情
//設置組 一個項目中一個組對應一個消費者 consumer = new DefaultMQPushConsumer(RocketMQConfig.groupName); //設置地址 consumer.setNamesrvAddr(RocketMQConfig.serverAddresses); //設置訂閱對應的topic consumer.subscribe(RocketMQConfig.topicName, "*"); //設置默認消費隊列中最后一個,默認也是這個配置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //消費者均等消費策略 ,默認也是這個配置 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); //設置存儲在本地還是遠程,默認廣播存儲在消費者本地、其他存儲在遠程 //consumer.setOffsetStore(new RemoteBrokerOffsetStore()); //設置消費者最大線程數 consumer.setConsumeThreadMax(100); //設置消費者最小線程數 consumer.setConsumeThreadMin(5); //消費者一次從mq中拉取多少條數據 consumer.setPullBatchSize(32); //拉取后每次消費多少條 consumer.setConsumeMessageBatchMaxSize(1); //設置為廣播模式,如果是廣播模式則重試次數失效 //consumer.setMessageModel(MessageModel.BROADCASTING); //設置為集群模式,默認是集群模式 consumer.setMessageModel(MessageModel.CLUSTERING);tag標簽
消費者可以選擇消費 某個Group 中的某個 topic 中的指定Tag
1、消費者手動過濾Tag (沒有用到的tag也進行傳輸浪費資源)
2、RocketMQ選擇發送給消費者
消費模式
1、Broker獲得消息然后將消息發送給消費者(消費者壓力大)
2、消費者間隔去向Broker拉取(沒有消息也會去拉取、間隔時間不好設置)
3、Broker和消費者之間每15秒發起一次長連接(默認)
Offset和CommitLog
CommitLog用于存儲發送的消息內容
Offset 用于存儲消息存儲在隊列中的下標
CommitLog默認存儲位置 根目錄下store/consumequeue/{topicName}/{queueid}/fileName
事務使用
集群部署
推薦同步雙寫、異步刷盤
總結
以上是生活随笔為你收集整理的志宇-RocketMQ学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html embed用法 Embed
- 下一篇: 考研数学若干题解析