rocketmq 初探(一)
大家好,我是烤鴨:
????今天看下rocketmq。這篇主要是簡單介紹下 rocketmq以及idea 本地調試 rocketmq。
項目架構
感興趣的可以下載源碼看下。
https://github.com/apache/rocketmq
項目結構圖。
rocketmq-acl: acl 秘鑰方式的鑒權,用在broker端。
rocketmq-broker:整個mq的核心,他能夠接受producer和consumer的請求,并調用store層服務對消息進行處理。HA服務的基本單元,支持同步雙寫,異步雙寫等模式。
rocketmq-client:mq客戶端實現,目前官方僅僅開源了java版本的mq客戶端,c++,go客戶端有社區開源貢獻。
rocketmq-common:一些模塊間通用的功能類,比如一些配置文件、常量。
rocketmq-distribution:腳本、配置模塊。
rocketmq-example:官方提供的例子。
rocketmq-filtersrv:消息過濾服務,相當于在broker和consumer中間加入了一個filter代理。
rocketmq-logappender:日志
rocketmq-logging:日志
rocketmq-namesrv:NameServer,類似服務注冊中心,broker在這里注冊,consumer和producer在這里找到broker地址
rocketmq-openmessaging:RocketMQ支持openmessaging,詳見:https://rocketmq.apache.org/docs/openmessaging-example/
rocketmq-remoting:基于netty的底層通信實現,所有服務間的交互都基于此模塊。
rocketmq-srvut:解析命令行的工具類。
rocketmq-store:存儲層實現,同時包括了索引服務,高可用HA服務實現。
rocketmq-tools:命令行工具,提供了消息查詢等功能。
下面重點說一下幾個模塊:
注冊中心 namesrv、broker、client 和 store,先看一下關系。
看這個圖是不是有點相似,沒錯,跟 dubbo 很像,除了多了 broker。
nameserver 是注冊中心,用來記錄broker信息、broker和topic關系。
producer 從nameserver 獲取broker信息,進行消息發送。
consumer 從nameserver 獲取broker信息,進行消息消費。
idea 導入源碼,本地調試
設置 rocketmq _home 目錄,后邊的namesrv和broker會用到。新建conf目錄,并將 rocket-distribution 的conf里的broker.conf、logback_broker.xml、
logback_namesrv.xml、logback_tools.xml 復制到新建的conf目錄中。我這里設置的目錄是 E:\my\rocketmq
我這里修改了日志目錄,方便查看日志。
啟動 NamesrvStartup
Connected to the target VM, address: '127.0.0.1:58819', transport: 'socket' Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation Disconnected from the target VM, address: '127.0.0.1:58819', transport: 'socket'啟動參數配置 rocketmq_home 的環境變量,ROCKETMQ_HOME=E:\my\rocketmq
啟動成功:
Connected to the target VM, address: '127.0.0.1:50261', transport: 'socket' The Name Server boot success. serializeType=JSON會發現 rocketmq_home 目錄下生成了 logs/rocketmqlogs 目錄,存放的是日志文件。
啟動broker
設置啟動參數和 rocketmq_home 的環境變量 :
autoCreateTopicEnable=true 是為了測試的時候可以發送時創建topic,默認是 false(不建議開啟,避免并發發送時,topic重復問題)
-c E:\my\rocketmq\conf\broker.conf -n localhost:9876 autoCreateTopicEnable=true ROCKETMQ_HOME=E:\my\rocketmq會發現 rocketmq_home 目錄下生成了 store 目錄,存放的是broker維護的信息,像消費者的偏移量、延遲隊列的偏移量、topic。
啟動consumer
rocketmq-example 項目下,example\src\main\java\org\apache\rocketmq\example\quickstart\Consumer.java
指定broker地址:
consumer.setNamesrvAddr("localhost:9876");啟動producer并發送消息
rocketmq-example 項目下,example\src\main\java\org\apache\rocketmq\example\quickstart\Producer.java
指定broker地址,修改循環次數為2次:
producer.setNamesrvAddr("localhost:9876");發送成功:
SendResult [sendStatus=SEND_OK, msgId=7F000001395C18B4AAC22C7A99940000, offsetMsgId=0AA80D1200002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=7F000001395C18B4AAC22C7A99D80001, offsetMsgId=0AA80D1200002A9F00000000000000C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]消費端接收成功:
ConsumeMessageThread_1 Receive_1 New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1625815032213, bornHost=/10.168.13.18:57729, storeTimestamp=1625815032241, storeHost=/10.168.13.18:10911, msgId=0AA80D1200002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1625815055025, UNIQ_KEY=7F000001395C18B4AAC22C7A99940000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] ConsumeMessageThread_2 Receive_1 New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1625815032280, bornHost=/10.168.13.18:57729, storeTimestamp=1625815032282, storeHost=/10.168.13.18:10911, msgId=0AA80D1200002A9F00000000000000C9, commitLogOffset=201, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1625815056025, UNIQ_KEY=7F000001395C18B4AAC22C7A99D80001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]當發送的時候 store目錄下會生成 commitLog 目錄(消息內容)和consumequeue目錄(存的是topic和queueId)
commitLog目錄 默認上來生成兩個文件,2個G。
consumequeue目錄,一級子目錄是topic,二級子目錄是queueId
mq 控制臺
rocketmq-console 在另一個倉庫,地址:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
啟動成功,畫面還是比較清新的:
模擬發送100條,可以看到每個broker的數量:
console功能還是有很多可以再開發的地方,官方基本不維護了,需要的可能得二次開發了。
像我們就開發了類似報表、報警、監控等一些功能,還是比較方便的。
總結
以上是生活随笔為你收集整理的rocketmq 初探(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机水平拼音怎么写,电脑上的拼音到底是
- 下一篇: python之函数用法startswit