使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
作者 | 遼天
來源 | 阿里巴巴云原生公眾號
導(dǎo)讀:本文將 rocktmq-spring-boot 的設(shè)計實現(xiàn)做一個簡單的介紹,讀者可以通過本文了解將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費 RocketMQ 消息。
在 Spring 生態(tài)中玩轉(zhuǎn) RocketMQ 系列文章:
- 《如何在 Spring 生態(tài)中玩轉(zhuǎn) RocketMQ?》
- 《羅美琪和春波特的故事…》
- 《RocketMQ-Spring 畢業(yè)兩周年,為什么能成為 Spring 生態(tài)中最受歡迎的 messaging 實現(xiàn)?》
本文配套可交互教程已登錄阿里云知行動手實驗室,PC 端登錄 start.aliyun.com 在瀏覽器中立即體驗。
通過本文,您將了解到:
- Spring 的消息框架介紹
- rocketmq-spring-boot 具體實現(xiàn)
- 使用示例
前言
上世紀(jì) 90 年代末,隨著 Java EE(Enterprise Edition) 的出現(xiàn),特別是 Enterprise Java Beans 的使用需要復(fù)雜的描述符配置和死板復(fù)雜的代碼實現(xiàn),增加了廣大開發(fā)者的學(xué)習(xí)曲線和開發(fā)成本,由此基于簡單的 XML 配置和普通 Java 對象(Plain Old Java Objects)的 Spring 技術(shù)應(yīng)運而生,依賴注入(Dependency Injection), 控制反轉(zhuǎn)(Inversion of Control)和面向切面編程(AOP)的技術(shù)更加敏捷地解決了傳統(tǒng) Java 企業(yè)及版本的不足。
隨著 Spring 的持續(xù)演進,基于注解(Annotation)的配置逐漸取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式發(fā)布,它基于“約定大于配置”(Convention over configuration)這一理念來快速地開發(fā)、測試、運行和部署 Spring 應(yīng)用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結(jié)合,讓應(yīng)用直接以命令行的方式運行,不需再部署到獨立容器中。這種簡便直接快速構(gòu)建和開發(fā)應(yīng)用的過程,可以使用約定的配置并且簡化部署,受到越來越多的開發(fā)者的歡迎。
Apache RocketMQ 是業(yè)界知名的分布式消息和流處理中間件,簡單地理解,它由 Broker 服務(wù)器和客戶端兩部分組成:
其中客戶端一個是消息發(fā)布者客戶端(Producer),它負責(zé)向 Broker 服務(wù)器發(fā)送消息;另外一個是消息的消費者客戶端(Consumer),多個消費者可以組成一個消費組,來訂閱和拉取消費 Broker 服務(wù)器上存儲的消息。
為了利用 Spring Boot 的快速開發(fā)和讓用戶能夠更靈活地使用 RocketMQ 消息客戶端,Apache RocketMQ 社區(qū)推出了 spring-boot-starter 實現(xiàn)。隨著分布式事務(wù)消息功能在 RocketMQ 4.3.0 版本的發(fā)布,近期升級了相關(guān)的 spring-boot 代碼,通過注解方式支持分布式事務(wù)的回查和事務(wù)消息的發(fā)送。
本文將對當(dāng)前的設(shè)計實現(xiàn)做一個簡單的介紹,讀者可以通過本文了解將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費 RocketMQ 消息。
Spring 中的消息框架
順便在這里討論一下在 Spring 中關(guān)于消息的兩個主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它們都能夠與 Spring Boot 整合并提供了一些參考的實現(xiàn)。和所有的實現(xiàn)框架一樣,消息框架的目的是實現(xiàn)輕量級的消息驅(qū)動的微服務(wù),可以有效地簡化開發(fā)人員對消息中間件的使用復(fù)雜度,讓系統(tǒng)開發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理。
1. Spring Messaging
Spring Messaging 是 Spring Framework 4 中添加的模塊,是 Spring 與消息系統(tǒng)集成的一個擴展性的支持。它實現(xiàn)了從基于 JmsTemplate 的簡單的使用 JMS 接口到異步接收消息的一整套完整的基礎(chǔ)架構(gòu),Spring AMQP 提供了該協(xié)議所要求的類似的功能集。在與 Spring Boot 的集成后,它擁有了自動配置能力,能夠在測試和運行時與相應(yīng)的消息傳遞系統(tǒng)進行集成。
單純對于客戶端而言,Spring Messaging 提供了一套抽象的 API 或者說是約定的標(biāo)準(zhǔn),對消息發(fā)送端和消息接收端的模式進行規(guī)定,不同的消息中間件提供商可以在這個模式下提供自己的 Spring 實現(xiàn):在消息發(fā)送端需要實現(xiàn)的是一個 XXXTemplate 形式的 Java Bean,結(jié)合 Spring Boot 的自動化配置選項提供多個不同的發(fā)送消息方法;在消息的消費端是一個 XXXMessageListener 接口(實現(xiàn)方式通常會使用一個注解來聲明一個消息驅(qū)動的 POJO),提供回調(diào)方法來監(jiān)聽和消費消息,這個接口同樣可以使用 Spring Boot 的自動化選項和一些定制化的屬性。
如果有興趣深入的了解 Spring Messaging 及針對不同的消息產(chǎn)品的使用,推薦閱讀這個文件。參考 Spring Messaging 的既有實現(xiàn),RocketMQ 的 spring-boot-starter 中遵循了相關(guān)的設(shè)計模式并結(jié)合 RocketMQ 自身的功能特點提供了相應(yīng)的 API(如順序、異步和事務(wù)半消息等)。
2. Spring Cloud Stream
Spring Cloud Stream 結(jié)合了 Spring Integration 的注解和功能,它的應(yīng)用模型如下:
該圖片引自 spring cloud stream
Spring Cloud Stream 框架中提供一個獨立的應(yīng)用內(nèi)核,它通過輸入(@Input)和輸出(@Output)通道與外部世界進行通信,消息源端(Source)通過輸入通道發(fā)送消息,消費目標(biāo)端(Sink)通過監(jiān)聽輸出通道來獲取消費的消息。這些通道通過專用的 Binder 實現(xiàn)與外部代理連接。開發(fā)人員的代碼只需要針對應(yīng)用內(nèi)核提供的固定的接口和注解方式進行編程,而不需要關(guān)心運行時具體的 Binder 綁定的消息中間件。在運行時,Spring Cloud Stream 能夠自動探測并使用在 classpath 下找到的Binder。
這樣開發(fā)人員可以輕松地在相同的代碼中使用不同類型的中間件:僅僅需要在構(gòu)建時包含進不同的 Binder。在更加復(fù)雜的使用場景中,也可以在應(yīng)用中打包多個 Binder 并讓它自己選擇 Binder,甚至在運行時為不同的通道使用不同的 Binder。
Binder 抽象使得 Spring Cloud Stream 應(yīng)用可以靈活的連接到中間件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的靈活配置配置能力,這樣的配置可以通過外部配置的屬性和 Spring Boot 支持的任何形式來提供(包括應(yīng)用啟動參數(shù)、環(huán)境變量和 application.yml 或者 application.properties 文件),部署人員可以在運行時動態(tài)選擇通道連接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。
Binder SPI 的方式來讓消息中間件產(chǎn)品使用可擴展的 API 來編寫相應(yīng)的 Binder,并集成到 Spring Cloud Steam 環(huán)境,目前 RocketMQ 還沒有提供相關(guān)的 Binder,我們計劃在下一步將完善這一功能,也希望社區(qū)里有這方面經(jīng)驗的同學(xué)積極嘗試,貢獻 PR 或建議。
spring-boot-starter的實現(xiàn)
在開始的時候我們已經(jīng)知道,spring boot starter 構(gòu)造的啟動器對于使用者是非常方便的,使用者只要在 pom.xml引入starter 的依賴定義,相應(yīng)的編譯,運行和部署功能就全部自動引入。因此常用的開源組件都會為 Spring 的用戶提供一個 spring-boot-starter 封裝給開發(fā)者,讓開發(fā)者非常方便集成和使用,這里我們詳細的介紹一下 RocketMQ(客戶端)的 starter 實現(xiàn)過程。
1. spring-boot-starter 的實現(xiàn)步驟
對于一個 spring-boot-starter 實現(xiàn)需要包含如下幾個部分:
1)在 pom.xml 的定義
- 定義最終要生成的 starter 組件信息
- 定義依賴包
它分為兩個部分:Spring 自身的依賴包和 RocketMQ 的依賴包。
2)配置文件類
定義應(yīng)用屬性配置文件類 RocketMQProperties,這個 Bean 定義一組默認的屬性值。用戶在使用最終的 starter 時,可以根據(jù)這個類定義的屬性來修改取值,當(dāng)然不是直接修改這個類的配置,而是 spring-boot 應(yīng)用中對應(yīng)的配置文件:src/main/resources/application.properties。
3)定義自動加載類
定義 src/resources/META-INF/spring.factories 文件中的自動加載類, 其目的是讓 spring boot 更具文中中所指定的自動化配置類來自動初始化相關(guān)的 Bean、Component 或 Service,它的內(nèi)容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration在 RocketMQAutoConfiguration 類的具體實現(xiàn)中,定義開放給用戶直接使用的 Bean 對象包括:
- RocketMQProperties 加載應(yīng)用屬性配置文件的處理類;
- RocketMQTemplate 發(fā)送端用戶發(fā)送消息的發(fā)送模板類;
- ListenerContainerConfiguration 容器 Bean 負責(zé)發(fā)現(xiàn)和注冊消費端消費實現(xiàn)接口類,這個類要求:由 @RocketMQMessageListener 注解標(biāo)注;實現(xiàn) RocketMQListener 泛化接口。
4)最后具體地進行?RpcketMQ 相關(guān)的封裝
在發(fā)送端(producer)和消費端(consumer)客戶端分別進行封裝,在當(dāng)前的實現(xiàn)版本提供了對 Spring Messaging 接口的兼容方式。
2. 消息發(fā)送端實現(xiàn)
1)普通發(fā)送端
發(fā)送端的代碼封裝在 RocketMQTemplate POJO 中,下圖是發(fā)送端的相關(guān)代碼的調(diào)用關(guān)系圖:
為了與 Spring Messaging 的發(fā)送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象類,來支持相關(guān)的消息轉(zhuǎn)換和發(fā)送方法,這些方法最終會代理給 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如異步,單向和順序等方法直接添加到 RoketMQTempalte 中,這些方法直接代理調(diào)用到 RocketMQ 的 Producer API 來進行消息的發(fā)送。
2)事務(wù)消息發(fā)送端
對于事務(wù)消息的處理,在消息發(fā)送端進行了部分的擴展,參考上面的調(diào)用關(guān)系類圖。
RocketMQTemplate 里加入了一個發(fā)送事務(wù)消息的方法 sendMessageInTransaction(),并且最終這個方法會代理到 RocketMQ 的 TransactionProducer 進行調(diào)用,在這個 Producer 上會注冊其關(guān)聯(lián)的 TransactionListener 實現(xiàn)類,以便在發(fā)送消息后能夠?qū)?TransactionListener 里的方法實現(xiàn)進行調(diào)用。
3. 消息消費端實現(xiàn)
在消費端 Spring-Boot 應(yīng)用啟動后,會掃描所有包含 @RocketMQMessageListener 注解的類(這些類需要集成 RocketMQListener 接口,并實現(xiàn) onMessage()方法),這個 Listener 會一對一的被放置到。
DefaultRocketMQListenerContainer 容器對象中,容器對象會根據(jù)消費的方式(并發(fā)或順序),將 RocketMQListener 封裝到具體的 RocketMQ 內(nèi)部的并發(fā)或者順序接口實現(xiàn)。在容器中創(chuàng)建 RocketMQ Consumer 對象,啟動并監(jiān)聽定制的 Topic 消息,如果有消費消息,則回調(diào)到 Listener 的 onMessage() 方法。
使用示例
上面的一章介紹了 RocketMQ 在 spring-boot-starter 方式的實現(xiàn),這里通過一個最簡單的消息發(fā)送和消費的例子來介紹如何使這個 rocketmq-spring-boot-starter。
1. RocketMQ 服務(wù)端的準(zhǔn)備
1)啟動 NameServer 和 Broker
要驗證 RocketMQ 的 Spring-Boot 客戶端,首先要確保 RocketMQ 服務(wù)正確的下載并啟動。可以參考 RocketMQ 主站的快速開始來進行操作。確保啟動 NameServer 和 Broker 已經(jīng)正確啟動。
2)創(chuàng)建實例中所需要的 Topics
在執(zhí)行啟動命令的目錄下執(zhí)行下面的命令行操作:
bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic2. 編譯 rocketmq-spring-boot-starter
目前的 spring-boot-starter 依賴還沒有提交的 Maven 的中心庫,用戶使用前需要自行下載 git 源碼,然后執(zhí)行 mvn clean install 安裝到本地倉庫。
git clone https://github.com/apache/rocketmq-externals.git cd rocketmq-spring-boot-starter mvn clean install3. 編寫客戶端代碼
用戶如果使用它,需要在消息的發(fā)布和消費客戶端的 maven 配置文件 pom.xml 中添加如下的依賴:
屬性 spring-boot-starter-rocketmq-version 的取值為:1.0.0-SNAPSHOT, 這與上一步驟中執(zhí)行安裝到本地倉庫的版本一致。
1)消息發(fā)送端的代碼
發(fā)送端的配置文件 application.properties:
發(fā)送端的 Java 代碼:
2)消息消費端代碼
消費端的配置文件 application.properties:
消費端的 Java 代碼:
這里只是簡單的介紹了使用 spring-boot 來編寫最基本的消息發(fā)送和接收的代碼,如果需要了解更多的調(diào)用方式,如: 異步發(fā)送,對象消息體,指定 tag 標(biāo)簽以及指定事務(wù)消息,請參看 github 的說明文檔和詳細的代碼。我們后續(xù)還會對這些高級功能進行陸續(xù)的介紹。
作者簡介
遼天,阿里巴巴技術(shù)專家,Apache RocketMQ 內(nèi)核控,擁有多年分布式系統(tǒng)研發(fā)經(jīng)驗,對 Microservice、Messaging 和 Storage 等領(lǐng)域有深刻理解, 目前專注 RocketMQ 內(nèi)核優(yōu)化以及 Messaging 生態(tài)建設(shè)。
在 PC 端登錄 start.aliyun.com 知行動手實驗室,沉浸式體驗在線交互教程。
總結(jié)
以上是生活随笔為你收集整理的使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面对大规模 K8s 集群,如何先于用户发
- 下一篇: What‘s new in dubbo-