rocketmq删除topic_RocketMq 快速入门教程
今年是不平凡的一年,對(duì)于每個(gè)個(gè)體都是。不論我們?cè)谀膬?#xff0c;經(jīng)歷了什么,向前走總沒(méi)錯(cuò)。雖然方向也很重要,但是不要在一個(gè)地方停太久,You young
編者薦語(yǔ):RocketMQ 逐漸成為最主流的消息隊(duì)列,學(xué)習(xí) RocketMQ 是每個(gè)攻城獅要做的事
一、消息中間件
什么是消息系統(tǒng)
二、RocketMq簡(jiǎn)介
發(fā)展
概念術(shù)語(yǔ)
架構(gòu)組成
三、安裝 RocketMq
系統(tǒng)環(huán)境
安裝部署 RocketMQ
RocketMq插件(可視化)
四、配置 RocketMq
啟動(dòng)RocketMQ
驗(yàn)證啟動(dòng)是否成功
生產(chǎn)環(huán)境 ACL 權(quán)限認(rèn)證
五、RocketMq 集群
RocketMq 集群、拓展閱讀
六、SpringBoot 集成 RocketMq
七、SpringMVC 架構(gòu)中使用 Demo
參考
一、消息中間件
什么是消息系統(tǒng)
簡(jiǎn)單來(lái)說(shuō):消息被發(fā)送到隊(duì)列中。“消息隊(duì)列”(Message Queue)是在消息的傳輸過(guò)程中保存消息的容器。
- 拓展閱讀
消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開(kāi)發(fā)效率和系統(tǒng)穩(wěn)定性。主要具有以下優(yōu)勢(shì):
削峰填谷(主要解決瞬時(shí)寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失、系統(tǒng)奔潰等問(wèn)題) 系統(tǒng)解耦(解決不同重要程度、不同能力級(jí)別系統(tǒng)之間依賴導(dǎo)致一死全死) 提升性能(當(dāng)存在一對(duì)多調(diào)用時(shí),可以發(fā)一條消息給消息系統(tǒng),讓消息系統(tǒng)通知相關(guān)系統(tǒng)) 蓄流壓測(cè)(線上有些鏈路不好壓測(cè),可以通過(guò)堆積一定量消息再放開(kāi)來(lái)壓測(cè)) 目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優(yōu)勢(shì)特性有:
? 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持) ? 支持結(jié)合rocketmq的多個(gè)系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提) ? 支持18個(gè)級(jí)別的延遲消息(rabbitmq和kafka不支持) ? 支持指定次數(shù)和時(shí)間間隔的失敗消息重發(fā)(kafka不支持,rabbitmq需要手動(dòng)確認(rèn)) ? 支持consumer端tag過(guò)濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持) ? 支持重復(fù)消費(fèi)(rabbitmq不支持,kafka支持)
二、RocketMq簡(jiǎn)介
官網(wǎng)地址:http://rocketmq.apache.org/
發(fā)展
阿里巴巴消息中間件起源于2001年的五彩石項(xiàng)目,Notify 在這期間應(yīng)運(yùn)而生,用于交易核心消息的流轉(zhuǎn)。
至2010年,B2B開(kāi)始大規(guī)模使用ActiveMQ作為消息內(nèi)核,隨著阿里業(yè)務(wù)的快速發(fā)展,急需一款支持順序消息,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生。
到2012年,MetaQ已經(jīng)發(fā)展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。隨后,將RocketMQ進(jìn)行了開(kāi)源,阿里的消息中間件正式走入了公眾的視野。
到2015年,RocketMQ已經(jīng)經(jīng)歷了多年雙十一的洗禮,在可用性、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)。與此同時(shí),云計(jì)算大行其道,阿里消息中間件基于RocketMQ推出了Aliware MQ 1.0,開(kāi)始為阿里云上成千上萬(wàn)家企業(yè)提供消息服務(wù)。
到今年,MetaQ在2016年雙十一承載了萬(wàn)億級(jí)消息的流轉(zhuǎn),跨越了一個(gè)新的里程碑,同時(shí)RocketMQ進(jìn)入Apache 孵化。
RocketMQ可以保證嚴(yán)格的消息順序
概念術(shù)語(yǔ)
Producer Group
標(biāo)識(shí)發(fā)送同一類消息的Producer,通常發(fā)送邏輯一致。發(fā)送普通消息的時(shí)候,僅標(biāo)識(shí)使用,并無(wú)特別用處。若事務(wù)消息,如果某條發(fā)送某條消息的producer-A宕機(jī),使得事務(wù)消息一直處于PREPARED狀態(tài)并超時(shí),則broker會(huì)回查同一個(gè)group的其 他producer,確認(rèn)這條消息應(yīng)該commit還是rollback。但開(kāi)源版本并不支持事務(wù)消息。
Consumer Group
標(biāo)識(shí)一類Consumer的集合名稱,這類Consumer通常消費(fèi)一類消息,且消費(fèi)邏輯一致。同一個(gè)Consumer Group下的各個(gè)實(shí)例將共同消費(fèi)topic的消息,起到負(fù)載均衡的作用。
消費(fèi)進(jìn)度以Consumer Group為粒度管理,不同Consumer Group之間消費(fèi)進(jìn)度彼此不受影響,即消息A被Consumer Group1消費(fèi)過(guò),也會(huì)再給Consumer Group2消費(fèi)。
注:RocketMQ要求同一個(gè)Consumer Group的消費(fèi)者必須要擁有相同的注冊(cè)信息,即必須要聽(tīng)一樣的topic(并且tag也一樣)。
Topic
標(biāo)識(shí)一類消息的邏輯名字,消息的邏輯管理單位。無(wú)論消息生產(chǎn)還是消費(fèi),都需要指定Topic。
Tag
RocketMQ支持給在發(fā)送的時(shí)候給topic打tag,同一個(gè)topic的消息雖然邏輯管理是一樣的。但是消費(fèi)topic1的時(shí)候,如果你訂閱的時(shí)候指定的是tagA,那么tagB的消息將不會(huì)投遞。
Message Queue
簡(jiǎn)稱Queue或Q。消息物理管理單位。一個(gè)Topic將有若干個(gè)Q。若Topic同時(shí)創(chuàng)建在不通的Broker,則不同的broker上都有若干Q,消息將物理地存儲(chǔ)落在不同Broker結(jié)點(diǎn)上,具有水平擴(kuò)展的能力。
無(wú)論生產(chǎn)者還是消費(fèi)者,實(shí)際的生產(chǎn)和消費(fèi)都是針對(duì)Q級(jí)別。例如Producer發(fā)送消息的時(shí)候,會(huì)預(yù)先選擇(默認(rèn)輪詢)好該Topic下面的某一條Q地發(fā)送;Consumer消費(fèi)的時(shí)候也會(huì)負(fù)載均衡地分配若干個(gè)Q,只拉取對(duì)應(yīng)Q的消息。
每一條message queue均對(duì)應(yīng)一個(gè)文件,這個(gè)文件存儲(chǔ)了實(shí)際消息的索引信息。并且即使文件被刪除,也能通過(guò)實(shí)際純粹的消息文件(commit log)恢復(fù)回來(lái)。
Offset
RocketMQ中,有很多offset的概念。但通常我們只關(guān)心暴露到客戶端的offset。一般我們不特指的話,就是指邏輯Message Queue下面的offset。
注:邏輯offset的概念在RocketMQ中字面意思實(shí)際上和真正的意思有一定差別,這點(diǎn)在設(shè)計(jì)上顯得有點(diǎn)混亂。祥見(jiàn)下面的解釋。
可以認(rèn)為一條邏輯的message queue是無(wú)限長(zhǎng)的數(shù)組。一條消息進(jìn)來(lái)下標(biāo)就會(huì)漲1,而這個(gè)數(shù)組的下標(biāo)就是offset。
max offset
字面上可以理解為這是標(biāo)識(shí)message queue中的max offset表示消息的最大offset。但是從源碼上看,這個(gè)offset實(shí)際上是最新消息的offset+1,即:下一條消息的offset。
min offset:
標(biāo)識(shí)現(xiàn)存在的最小offset。而由于消息存儲(chǔ)一段時(shí)間后,消費(fèi)會(huì)被物理地從磁盤刪除,message queue的min offset也就對(duì)應(yīng)增長(zhǎng)。這意味著比min offset要小的那些消息已經(jīng)不在broker上了,無(wú)法被消費(fèi)。
consumer offset
字面上,可以理解為標(biāo)記Consumer Group在一條邏輯Message Queue上,消息消費(fèi)到哪里即消費(fèi)進(jìn)度。但從源碼上看,這個(gè)數(shù)值是消費(fèi)過(guò)的最新消費(fèi)的消息offset+1,即實(shí)際上表示的是下次拉取的offset位置。
消費(fèi)者拉取消息的時(shí)候需要指定offset,broker不主動(dòng)推送消息, offset的消息返回給客戶端。
consumer剛啟動(dòng)的時(shí)候會(huì)獲取持久化的consumer offset,用以決定從哪里開(kāi)始消費(fèi),consumer以此發(fā)起第一次請(qǐng)求。
每次消息消費(fèi)成功后,這個(gè)offset在會(huì)先更新到內(nèi)存,而后定時(shí)持久化。在集群消費(fèi)模式下,會(huì)同步持久化到broker,而在廣播模式下,則會(huì)持久化到本地文件。
架構(gòu)組成
RocketMQ集群部署三、安裝 RocketMq
系統(tǒng)環(huán)境
JDK8 Windows10
安裝部署 RocketMQ
地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/rocketmq-all-4.3.0-bin-release.zip
選擇 Binary 進(jìn)行下載
解壓
環(huán)境變量配置
必須配置
變量名:ROCKETMQ_HOME 變量值(絕對(duì)路徑):D:\rocketMq\rocketmq-all-4.3.0-bin-release
環(huán)境變量 Path 中也配置:%ROCKETMQ_HOME%\bin
RocketMq插件(可視化)
選擇性安裝
git clone https://github.com/apache/rocketmq-externals.git
解壓
修改配置
server.port=8088
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=127.0.0.1:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
cd ?\rocketmq-externals\rocketmq-console
mvn clean package -Dmaven.test.skip=true
cd ?\rocketmq-externals\rocketmq-console\target
java -jar rocketmq-console-ng-2.0.0.jar
瀏覽器訪問(wèn):http://127.0.0.1:8088
四、配置 RocketMq
啟動(dòng)RocketMQ
默認(rèn)配置啟動(dòng)
- 啟動(dòng)
啟動(dòng)NAMESERVER:
Cmd命令框執(zhí)行進(jìn)入至‘MQ文件夾\bin’下,然后執(zhí)行‘start mqnamesrv.cmd’,啟動(dòng)NAMESERVER。成功后會(huì)彈出提示框,此框勿關(guān)閉。
啟動(dòng)BROKER:
Cmd命令框執(zhí)行進(jìn)入至‘MQ文件夾\bin’下,然后執(zhí)行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動(dòng)BROKER。成功后會(huì)彈出提示框,此框勿關(guān)閉。
驗(yàn)證啟動(dòng)是否成功
驗(yàn)證生產(chǎn)消息正常
執(zhí)行如下命令:
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
能看到類似如下輸出:
SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...
驗(yàn)證消費(fèi)消息正常
執(zhí)行如下命令:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
能看到類似如下輸出:
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, stor....
其他命令
RocketMQ 自帶的連接工具是 maadmin ,執(zhí)行 mqadmin.cmd 就可以查看有哪些命令參數(shù)可用。
$?mqadmin.cmdThe?most?commonly?used?mqadmin?commands?are:
???updateTopic??????????Update?or?create?topic
???deleteTopic??????????Delete?topic?from?broker?and?NameServer.
???updateSubGroup???????Update?or?create?subscription?group
???deleteSubGroup???????Delete?subscription?group?from?broker.
???updateBrokerConfig???Update?broker's?config
???updateTopicPerm??????Update?topic?perm
???topicRoute???????????Examine?topic?route?info
???topicStatus??????????Examine?topic?Status?info
???topicClusterList?????get?cluster?info?for?topic
???brokerStatus?????????Fetch?broker?runtime?status?data
???queryMsgById?????????Query?Message?by?Id
???queryMsgByKey????????Query?Message?by?Key
???queryMsgByUniqueKey??Query?Message?by?Unique?key
???queryMsgByOffset?????Query?Message?by?offset
???queryMsgByUniqueKey??Query?Message?by?Unique?key
???printMsg?????????????Print?Message?Detail
???printMsgByQueue??????Print?Message?Detail
???sendMsgStatus????????send?msg?to?broker.
???brokerConsumeStats???Fetch?broker?consume?stats?data
???producerConnection???Query?producer's?socket?connection?and?client?version
???consumerConnection???Query?consumer's?socket?connection,?client?version?and?subscription
???consumerProgress?????Query?consumers's?progress,?speed
???consumerStatus???????Query?consumer's?internal?data?structure
???cloneGroupOffset?????clone?offset?from?other?group.
???clusterList??????????List?all?of?clusters
???topicList????????????Fetch?all?topic?list?from?name?server
???updateKvConfig???????Create?or?update?KV?config.
???deleteKvConfig???????Delete?KV?config.
???wipeWritePerm????????Wipe?write?perm?of?broker?in?all?name?server
???resetOffsetByTime????Reset?consumer?offset?by?timestamp(without?client?restart).
???updateOrderConf??????Create?or?update?or?delete?order?conf
???cleanExpiredCQ???????Clean?expired?ConsumeQueue?on?broker.
???cleanUnusedTopic?????Clean?unused?topic?on?broker.
???startMonitoring??????Start?Monitoring
???statsAll?????????????Topic?and?Consumer?tps?stats
???allocateMQ???????????Allocate?MQ
???checkMsgSendRT???????check?message?send?response?time
???clusterRT????????????List?All?clusters?Message?Send?RT
???getNamesrvConfig?????Get?configs?of?name?server.
???updateNamesrvConfig??Update?configs?of?name?server.
???getBrokerConfig??????Get?broker?config?by?cluster?or?special?broker!
???queryCq??????????????Query?cq?command.
???sendMessage??????????Send?a?message
???consumeMessage???????Consume?message
See?'mqadmin?help?<command>'?for?more?information?on?a?specific?command.
Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?PermSize=128m;?support?was?removed?in?8.0
Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?MaxPermSize=128m;?support?was?removed?in?8.0
啟動(dòng)成功后先簡(jiǎn)單執(zhí)行一個(gè)命令(在 linux 上去掉 .cmd 即可)
- 查詢 topic 列表
mqadmin.cmd topicList -n '127.0.0.1:9876'
生產(chǎn)環(huán)境 ACL 權(quán)限認(rèn)證
修改配置
使用默認(rèn)配置,像上一節(jié)我們就可以啟動(dòng)成功。但是在生產(chǎn)環(huán)境中,我們一定要加權(quán)限認(rèn)證
打開(kāi) aclEnable 開(kāi)關(guān)
在文件 conf/broker.conf 新增 aclEnable=true
新建文件 conf/plain_acl.yml,填入如下文本
accounts:
-?accessKey:?RocketMQ
??secretKey:?12345678
??whiteRemoteAddress:
??admin:?false
??defaultTopicPerm:?DENY
??defaultGroupPerm:?SUB
??topicPerms:
??-?TopicTest=PUB
??groupPerms:
??#?the?group?should?convert?to?retry?topic
??-?oms_consumer_group=DENY
-?accessKey:?admin
??secretKey:?12345678
??whiteRemoteAddress:
??#?if?it?is?admin,?it?could?access?all?resources
??admin:?true?
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
名詞介紹
globalWhiteRemoteAddresses
全局白名單,其類型為數(shù)組,即支持多個(gè)配置。其支持的配置格式如下:
空 表示不設(shè)置白名單,該條規(guī)則默認(rèn)返回false。“” 表示全部匹配,該條規(guī)則直接返回true,將會(huì)阻斷其他規(guī)則的判斷,請(qǐng)慎重使用。192.168.0.{100,101} 多地址配置模式,ip地址的最后一組,使用{},大括號(hào)中多個(gè)ip地址,用英文逗號(hào)(,)隔開(kāi)。192.168.1.100,192.168.2.100 直接使用,分隔,配置多個(gè)ip地址。192.168..或192.168.100-200.10-20 每個(gè)IP段使用 "" 或"-"表示范圍。
accounts
配置用戶信息,該類型為數(shù)組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
accessKey 登錄用戶名,長(zhǎng)度必須大于6個(gè)字符。
secretKey 登錄密碼。長(zhǎng)度必須大于6個(gè)字符。
whiteRemoteAddress 用戶級(jí)別的IP地址白名單。其類型為一個(gè)字符串,其配置規(guī)則與globalWhiteRemoteAddresses,但只能配置一條規(guī)則。
admin boolean類型,設(shè)置是否是admin。如下權(quán)限只有admin=true時(shí)才有權(quán)限執(zhí)行。
UPDATE_AND_CREATE_TOPIC 更新或創(chuàng)建主題。UPDATE_BROKER_CONFIG 更新Broker配置。DELETE_TOPIC_IN_BROKER 刪除主題。UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或創(chuàng)建訂閱組信息。DELETE_SUBSCRIPTIONGROUP 刪除訂閱組信息。5. defaultTopicPerm 默認(rèn)topic權(quán)限。該值默認(rèn)為DENY(拒絕)。
defaultGroupPerm 默認(rèn)消費(fèi)組權(quán)限,該值默認(rèn)為DENY(拒絕),建議值為SUB。
topicPerms 設(shè)置topic的權(quán)限。其類型為數(shù)組,其可選擇值在下節(jié)介紹。
groupPerms 設(shè)置消費(fèi)組的權(quán)限。其類型為數(shù)組,其可選擇值在下節(jié)介紹。可以為每一消費(fèi)組配置不一樣的權(quán)限。
五、RocketMq 集群
RocketMq 集群、拓展閱讀
參考:https://www.jianshu.com/p/2838890f3284
六、SpringBoot 集成 RocketMq
代碼會(huì)同步到 GitHub,回復(fù)github領(lǐng)取
七、SpringMVC 架構(gòu)中使用 Demo
參考
安裝部署參考博客:https://www.imooc.com/article/290089
完整中文參考文章:http://www.itmuch.com/books/rocketmq/operation.html
阿里文檔參考:https://help.aliyun.com/document_detail/29533.html
我是pub哥,Java服務(wù)端工程師,對(duì)大數(shù)據(jù)懂一點(diǎn),三觀要正。我們下期再見(jiàn)
財(cái)經(jīng)相關(guān):
最近幾天白酒、半導(dǎo)體、新能源都在上漲,應(yīng)該有很多被洗人,投資建議,有持倉(cāng)的韭菜,下跌再拋出。
總結(jié)
以上是生活随笔為你收集整理的rocketmq删除topic_RocketMq 快速入门教程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: jq苹果手机全屏下点击无效果_苹果系统自
- 下一篇: 在fritzing中怎么导入_电路图制作