rocketmq 消息指定_进大厂必备的RocketMQ你会吗?
點擊關注"故里學Java"
右上角"設為星標"好文章不錯過
關于消息隊列,相信大家都不陌生,現在的中大型項目中或多或少都有使用到消息隊列,對于消息隊列大家可能都有一定的了解,使用消息隊列可以解決什么樣的問題,又會帶來哪些問題相信也有了解,在前邊的文章《消息隊列(一)》中有介紹,感興趣的小伙伴可以點擊文章名直接打開。
今天我們主要介紹一下RocketMQ,關于RocketMQ很多人只知道是阿里開源的一款MQ中間件,實際工作中還是用的RabblitMQ,本文以及接下來幾篇文章,我會分享一下RocketMQ相關的知識,詳細的介紹一下RocketMQ,希望可以幫助到需要的朋友們。
RocketMQ基本概念
- Producer:消息生產者,負責生產消息,一般由業務系統負責生產消息,消息生產者把業務應用產生的消息發送到broker服務器,
- Consumer:消息消費者,從broker服務器拉取消息或者接收broker推送的消息進行消費處理。
- Broker:消息中轉的角色,負責存儲消息、轉發消息。
- Name Server:充當路由消息的提供者,生產者或消費者可以通過名字查找所需broker的IP列表,集群部署的時候,各個NameServer實例是相互獨立的,沒有信息交換。
- Topic:表示一類消息的集合,是RocketMQ進行消息訂閱的基本單位。
- Message Queue:用于存儲消息的物理地址,每個Topic中是消息地址存儲于多個Message Queue中。
- Message:消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬于一個主題。
RocketMQ的特性
RocketMQ的功能特性很多,這里只簡單介紹幾個:
指的是可以按照消息的發送順序來消費,有時候一組消息需要按照指定的順序消費才有意義,但是多個消費者是并行消費的,RocketMQ可以嚴格的保證消息的有序。順序消息可以分為兩類:全局順序消息和分區順序消息。全局順序是指某個topic下所有的消息都要保證順序,部分順序消息只要保證每一組消息被順序消費。
- 全局順序消費:對于指定的一個Topic,所有的消息按照嚴格的先入先出的順序依次進行發布和消費
- 分區順序消費:對于指定的一個Topic,所有的消息根據sharding key進行區塊分區,同一個分區的消息按照先入先出的順序進行消費。性能較全局順序高。
RocketMQ事務消息是指應用本地事務和發送消息操作可以被定義在全局事務中,要么同時成功,要么同時失敗,RocketMQ的事務消息提供了類似X/Open XA的分布式事務功能,通過事務消息能達到分布式事務的最終一致性。
定時消息是指消息發送到broker后,不會立即消費,等到設定的設定的實際才會投遞給真正的topic。broker有配置項messageDelayLevel,默認值有1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也可以配置自定義的messageDelayLevel,在發送消息的時候,設置delayLevel等級即可:msg.setDelayLevel(level)。
定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
回溯消息是指消費者以及消費成功,由于業務需要,需要重新消費,如果要實現此功能,Broker在向消費者投遞成功消息后,消息仍然需要保留。重新消費一般是有一定時間緯度的,RocketMQ支持按照時間回溯消息,時間維度可以精確到毫秒。
生產者在發送消息時,同步消費失敗會重投,異步消息有重試,oneway沒有任何保證。消息重投可以最大限度的保證消息發送成功、不丟失,但是也會導致消息重復,當消息量大、網絡不好的時候消息重復的概率就會提高。
我們可以根據需要設置消息重試策略:
- retryTimesWhenSendFailed:同步發送失敗重投次數,默認為2 ,在重投的時候不會發送給上次失敗的broker,會嘗試向其他的broker發送,盡可能的保證消息不丟失。
- retryTimesWhenSendAsyncFailed:異步發送失敗重試次數,異步的重試還是選擇上次的broker,不會選擇其他的broker,不能保證消息不丟失。
- retryAnotherBrokerWhenNotStoreOK:消費刷盤超時或者slave不可用,是否嘗試發送給其他的broker,默認為false,非常重要的消息我們可以開啟。
死信隊列用于處理消費失敗的消息,當消息消費失敗的時候,會自動進行消息重試,如果達到最大重試次數后,還是沒有消費成功,就說明正常情況下不能正確的消費該消息,此時消息隊列會把這個消息發送到該消費者對應的特殊隊列中。RocketMQ將這種消息稱為死信消息,將這種存儲死信消息的隊列稱為死信隊列,可以通過console控制臺對死信隊列中的消息進行重發。
生產者流控,一般是因為broker處理能力達到了上限。消費者流控,一般是因為消費者消費能力達到了上限。
生產者流控:
- commitLog文件被鎖時間超過osPageCacheBusyTimeOutMills時,參數默認為1000ms,返回流控。
- 如果開啟transientStorePoolEnable == true,且broker為異步刷盤的主機,且transientStorePool中資源不足,拒絕當前send請求,返回流控。
- broker每隔10ms檢查send請求隊列頭部請求的等待時間,如果超過waitTimeMillsInSendQueue,默認200ms,拒絕當前send請求,返回流控。
- broker通過拒絕send 請求方式實現流量控制。
生產者流控不會嘗試消息重投。
消費者流控:
- 消費者本地緩存消息數超過pullThresholdForQueue時,默認1000。
- 消費者本地緩存消息大小超過pullThresholdSizeForQueue時,默認100MB。
- 消費者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時,默認2000。
消費者流控會降低拉取頻率。
RocketMQ技術架構
以多Master多Slave模式為例:
我們可以看到,RocketMQ架構上分為四部分,分別為Producer、Consumer、NameServer、Broker。- Producer:消息發布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。
- Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。
- NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每個NameServer將保存關于Broker集群的整個路由信息和用于客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一臺NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息。
- Broker:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模塊。
RocketMQ部署架構
以多Master多Slave模式為例:
集群的工作流程;
- 啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
- 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
- Producer發送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
- Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
今天的分享暫時告一段落,后邊還有多篇文章繼續分享,理論分享結束會有一個demo來演示各個場景。
- END -
往期回顧
? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
大家好,我是"故里學Java"公號作者,你可以叫我"故里"。
一直堅信技術能改變生活,愿保持初心,加油技術人!
點個贊,證明你還愛我
總結
以上是生活随笔為你收集整理的rocketmq 消息指定_进大厂必备的RocketMQ你会吗?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python pdf转txt保留全部信息
- 下一篇: java将mysql数据写入到txt_j