javascript
Spring Cloud Alibaba RocketMQ 快速入门
Spring Cloud Alibaba 消息隊列 RocketMQ 入門_weixin_42073629的博客-CSDN博客_spring cloud alibaba rocketmq1. 概述本文我們來學習Spring Cloud Alibaba提供的Spring Cloud Stream RocketMQ組件,基于Spring Cloud Stream的編程模型,接入 RocketMQ 作為消息中間件,實現消息驅動的微服務。RocketMQ是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用...https://blog.csdn.net/weixin_42073629/article/details/106535675
?
1. 概述
本文我們來學習?Spring Cloud Alibaba?提供的?Spring Cloud Stream RocketMQ?組件,基于?Spring Cloud Stream?的編程模型,接入 RocketMQ 作為消息中間件,實現消息驅動的微服務。
RocketMQ?是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手游、視頻、物聯網、車聯網等。
具有以下特點:
- 能夠保證嚴格的消息順序
- 提供豐富的消息拉取模式
- 高效的訂閱者水平擴展能力
- 實時的消息訂閱機制
- 億級消息堆積能力
在開始本文之前,胖友需要對 RocketMQ 進行簡單的學習。可以閱讀《RocketMQ 極簡入門》文章,將第一二小節看完,在本機搭建一個 RocketMQ 服務。
2. Spring Cloud Stream 介紹
Spring Cloud Stream?是一個用于構建基于消息的微服務應用框架,使用?Spring Integration?與 Broker 進行連接。
友情提示:可能有胖友對 Broker 不太了解,我們來簡單解釋下。
一般來說,消息隊列中間件都有一個?Broker Server(代理服務器),消息中轉角色,負責存儲消息、轉發消息。
例如說在 RocketMQ 中,Broker 負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。另外,Broker 也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
Spring Cloud Stream 提供了消息中間件的統一抽象,推出了 publish-subscribe、consumer groups、partition 這些統一的概念。
Spring Cloud Stream 內部有兩個概念:Binder?和?Binding。
①?Binder,跟消息中間件集成的組件,用來創建對應的 Binding。各消息中間件都有自己的 Binder 具體實現。
public interface Binder<T, C extends ConsumerProperties, // 消費者配置P extends ProducerProperties> { // 生產者配置// 創建消費者的 BindingBinding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);// 創建生產者的 BindingBinding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);}- Kafka 實現了?KafkaMessageChannelBinder
- RabbitMQ 實現了?RabbitMessageChannelBinder
- RocketMQ 實現了?RocketMQMessageChannelBinder
②?Binding,包括 Input Binding 和 Output Binding。Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸。
最終整體交互如下圖所示:
可能看完之后,胖友對 Spring Cloud Stream 還是有點懵逼,并且覺得概念怎么這么多呢?不要慌,我們先來快速入個門,會有更加具象的感受。
3. 快速入門
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-demo
- 消費者:labx-06-sca-stream-rocketmq-consumer-demo
本小節,我們一起來快速入門下,會創建 2 個項目,分別作為生產者和消費者。最終項目如下圖所示:
3.1 搭建生產者
創建?labx-06-sca-stream-rocketmq-producer-demo?項目,作為生產者。
3.1.1 引入依賴
創建?pom.xml?文件中,引入 Spring Cloud Alibaba RocketMQ 相關依賴。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>labx-06</artifactId><groupId>cn.iocoder.springboot.labs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>labx-06-sca-stream-rocketmq-producer-demo</artifactId><properties><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source>1.8</maven.compiler.source><spring.boot.version>2.2.4.RELEASE</spring.boot.version><spring.cloud.version>Hoxton.SR1</spring.cloud.version><spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version></properties><!--引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,進行依賴版本的管理,防止不兼容。在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 開發團隊推薦了三者的依賴關系--><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring.cloud.alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- 引入 SpringMVC 相關依賴,并實現對其的自動配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相關依賴,將 RocketMQ 作為消息隊列,并實現對其的自動配置 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency></dependencies></project>通過引入?spring-cloud-starter-stream-rocketmq?依賴,引入并實現 RocketMQ 的自動配置。在該依賴中,已經幫我們自動引入 RocketMQ 的大量依賴,非常方便,如下圖所示:
3.1.2 配置文件
創建?application.yaml?配置文件,添加 Spring Cloud Alibaba RocketMQ 相關配置。
spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080①?spring.cloud.stream?為 Spring Cloud Stream 配置項,對應?BindingServiceProperties?類。配置的層級有點深,我們一層一層來看看。
②?spring.cloud.stream.bindings?為 Binding 配置項,對應?BindingProperties?Map。其中,key?為 Binding 的名字。要注意,雖然說 Binding 分成 Input 和 Output 兩種類型,但是在配置項中并不會體現出來,而是要在稍后搭配?@Input?還是?@Output?注解,才會有具體的區分。
這里,我們配置了一個名字為?demo01-output?的 Binding。從命名上,我們的意圖是想作為 Output Binding,用于生產者發送消息。
-
destination:目的地。在 RocketMQ 中,使用 Topic 作為目的地。這里我們設置為?DEMO-TOPIC-01。
主題(Topic):表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是 RocketMQ 進行消息訂閱的基本單位。
-
content-type:內容格式。這里使用 JSON 格式,因為稍后我們將發送消息的類型為 POJO,使用 JSON 進行序列化。
③?spring.cloud.stream.rocketmq?為 Spring Cloud Stream RocketMQ 配置項。
④?spring.cloud.stream.rocketmq.binder?為 RocketMQ Binder 配置項,對應?RocketMQBinderConfigurationProperties?類。
-
name-server:RocketMQ Namesrv 地址。
名字服務(Name Server):名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的 Broker IP 列表。多個 Namesrv 實例組成集群,但相互獨立,沒有信息交換。
⑤?spring.cloud.stream.rocketmq.bindings?為 RocketMQ?自定義?Binding 配置項,用于對通用的?spring.cloud.stream.bindings?配置項的增強,實現 RocketMQ Binding 獨特的配置。該配置項對應?RocketMQBindingProperties?Map,其中?key?為 Binding 的名字,需要對應上噢。
這里,我們對名字為?demo01-output?的 Binding 進行增強,進行 Producer 的配置。其中,producer?為 RocketMQ Producer 配置項,對應?RocketMQProducerProperties?類。
-
group:生產者分組。
生產者組(Producer Group):同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則 Broker 服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
-
sync:是否同步發送消息,默認為?false?異步。一般業務場景下,使用同步發送消息較多,所以這里我們設置為?true?同步消息。
使用 RocketMQ 發送三種類型的消息:同步消息(sync)、異步消息(async)和單向消息(oneway)。其中前兩種消息是可靠的,因為會有發送是否成功的應答。
3.1.3 MySource
創建?MySource?接口,聲明名字為 Output Binding。代碼如下:
public interface MySource {@Output("demo01-output")MessageChannel demo01Output();}這里,我們通過?@Output?注解,聲明了一個名字為?demo01-output?的 Output Binding。注意,這個名字要和我們配置文件中的?spring.cloud.stream.bindings?配置項對應上。
同時,@Output?注解的方法的返回結果為?MessageChannel?類型,可以使用它發送消息。MessageChannel 提供的發送消息的方法如下:
@FunctionalInterface public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}boolean send(Message<?> message, long timeout);}那么,我們是否要實現 MySource 接口呢?答案是不需要,全部交給 Spring Cloud Stream 的?BindableProxyFactory?來解決。BindableProxyFactory 會通過動態代理,自動實現 MySource 接口。 而?@Output?注解的方法的返回值,BindableProxyFactory 會掃描帶有?@Output?注解的方法,自動進行創建。
例如說,#demo01Output()?方法被自動創建返回結果為?DirectWithAttributesChannel,它是 MessageChannel 的子類。
友情提示:感興趣的胖友,可以在 BindableProxyFactory 的?#afterPropertiesSet()?和?#invoke(MethodInvocation invocation)?方法上,都打上一個斷點,然后進行愉快的調試。
3.1.4 Demo01Message
創建?Demo01Message?類,示例 Message 消息。代碼如下:
public class Demo01Message {/*** 編號*/private Integer id;// ... 省略 setter/getter/toString 方法}3.1.5 Demo01Controller
創建?Demo01Controller?類,提供發送消息的 HTTP 接口。代碼如下:
@RestController @RequestMapping("/demo01") public class Demo01Controller {@Autowiredprivate MySource mySource; // <X>@GetMapping("/send")public boolean send() {// <1> 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// <2> 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();// <3> 發送消息return mySource.demo01Output().send(springMessage);}}- <X>?處,使用?@Autowired?注解,注入 MySource Bean。
- <1>?處,創建 Demo01Message 對象。
- <2>?處,使用?MessageBuilder?創建 Spring?Message?對象,并設置消息內容為 Demo01Message 對象。
- <3>?處,通過 MySource 獲得 MessageChannel 對象,然后發送消息。
3.1.6 ProducerApplication
創建?ProducerApplication?類,啟動應用。代碼如下:
@SpringBootApplication @EnableBinding(MySource.class) public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}}使用?@EnableBinding?注解,聲明指定接口開啟 Binding 功能,掃描其?@Input?和?@Output?注解。這里,我們設置為 MySource 接口。
3.2 搭建消費者
創建?labx-06-sca-stream-rocketmq-consumer-demo?項目,作為消費者。
3.2.1 引入依賴
創建?pom.xml?文件中,引入 Spring Cloud Alibaba RocketMQ 相關依賴。
友情提示:和「3.1.1 引入依賴」基本一樣,點擊?鏈接?查看。
3.2.2 配置文件
創建?application.yaml?配置文件,添加 Spring Cloud Alibaba RocketMQ 相關配置。
spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者總體來說,和「3.1.2 配置文件」是比較接近的,所以我們只說差異點噢。
①?spring.cloud.stream.bindings?為 Binding 配置項。
這里,我們配置了一個名字為?demo01-input?的 Binding。從命名上,我們的意圖是想作為 Input Binding,用于消費者消費消息。
-
group:消費者分組。
消費者組(Consumer Group):同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的 Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
②?spring.cloud.stream.rocketmq.bindings?為 RocketMQ?自定義?Binding 配置項。
這里,我們對名字為?demo01-input?的 Binding 進行增強,進行 Consumer 的配置。其中,consumer?為 RocketMQ Producer 配置項,對應?RocketMQConsumerProperties?類。
-
enabled:是否開啟消費,默認為?true。在日常開發時,如果在本地環境不想消費,可以通過設置?enabled?為?false?進行關閉。
-
broadcasting: 是否使用廣播消費,默認為?false?使用集群消費。
- 集群消費(Clustering):集群消費模式下,相同 Consumer Group 的每個 Consumer 實例平均分攤消息。
- 廣播消費(Broadcasting):廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。
這里一點要注意!!!艿艿加了三個感嘆號,一定要理解集群消費和廣播消費的差異。我們來舉個例子,以有兩個消費者分組 A 和 B 的場景舉例子:
- 假設每個消費者分組各啟動一個實例,此時我們發送一條消息,該消息會被兩個消費者分組?"consumer_group_01"?和?"consumer_group_02"?都各自消費一次。
- 假設每個消費者分組各啟動一個實例,此時我們發送一條消息,該消息會被分組 A 的某個實例消費一次,被分組 B 的某個實例也消費一次
通過集群消費的機制,我們可以實現針對相同 Topic ,不同消費者分組實現各自的業務邏輯。例如說:用戶注冊成功時,發送一條 Topic 為?"USER_REGISTER"?的消息。然后,不同模塊使用不同的消費者分組,訂閱該 Topic ,實現各自的拓展邏輯:
- 積分模塊:判斷如果是手機注冊,給用戶增加 20 積分。
- 優惠劵模塊:因為是新用戶,所以發放新用戶專享優惠劵。
- 站內信模塊:因為是新用戶,所以發送新用戶的歡迎語的站內信。
- ... 等等
這樣,我們就可以將注冊成功后的業務拓展邏輯,實現業務上的解耦,未來也更加容易拓展。同時,也提高了注冊接口的性能,避免用戶需要等待業務拓展邏輯執行完成后,才響應注冊成功。
同時,相同消費者分組的多個實例,可以實現高可用,保證在一個實例意外掛掉的情況下,其它實例能夠頂上。并且,多個實例都進行消費,能夠提升消費速度。
友情提示:如果還不理解的話,沒有關系,我們下面會演示下我們上面舉的例子。
3.2.3 MySink
創建?MySink?接口,聲明名字為 Input Binding。代碼如下:
public interface MySink {String DEMO01_INPUT = "demo01-input";@Input(DEMO01_INPUT)SubscribableChannel demo01Input();}這里,我們通過?@Input?注解,聲明了一個名字為?demo01-input?的 Input Binding。注意,這個名字要和我們配置文件中的?spring.cloud.stream.bindings?配置項對應上。
同時,@Input?注解的方法的返回結果為?SubscribableChannel?類型,可以使用它訂閱消息來消費。MessageChannel 提供的訂閱消息的方法如下:
public interface SubscribableChannel extends MessageChannel {boolean subscribe(MessageHandler handler); // 訂閱boolean unsubscribe(MessageHandler handler); // 取消訂閱}那么,我們是否要實現 MySink 接口呢?答案也是不需要,還是全部交給 Spring Cloud Stream 的?BindableProxyFactory?大兄弟來解決。BindableProxyFactory 會通過動態代理,自動實現 MySink 接口。 而?@Input?注解的方法的返回值,BindableProxyFactory 會掃描帶有?@Input?注解的方法,自動進行創建。
例如說,#demo01Input()?方法被自動創建返回結果為?DirectWithAttributesChannel,它也是 SubscribableChannel 的子類。
友情提示:感興趣的胖友,可以在 BindableProxyFactory 的?#afterPropertiesSet()?和?#invoke(MethodInvocation invocation)?方法上,都打上一個斷點,然后進行愉快的調試。
3.2.4 Demo01Message
創建?Demo01Message?類,示例 Message 消息。
友情提示:和「3.1.4 Demo01Message」基本一樣,點擊?鏈接?查看。
3.2.5 Demo01Consumer
創建?Demo01Consumer?類,消費消息。代碼如下:
@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}在方法上,添加?@StreamListener?注解,聲明對應的?Input?Binding。這里,我們使用?MySink.DEMO01_INPUT。
又因為我們消費的消息是 POJO 類型,所以我們需要添加?@Payload?注解,聲明需要進行反序列化成 POJO 對象。
3.2.6 ConsumerApplication
創建?ConsumerApplication?類,啟動應用。代碼如下:
@SpringBootApplication @EnableBinding(MySink.class) public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}使用?@EnableBinding?注解,聲明指定接口開啟 Binding 功能,掃描其?@Input?和?@Output?注解。這里,我們設置為 MySink 接口。
3.3 測試單集群多實例的場景
本小節,我們會在一個消費者集群啟動兩個實例,測試在集群消費的情況下的表現。
① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。此時在 IDEA 控制臺看到 RocketMQ 相關的日志如下:
2020-02-22 09:32:54.462 INFO 50472 --- [ main] s.b.r.c.RocketMQListenerBindingContainer : running container: RocketMQListenerBindingContainer{consumerGroup='demo01-consumer-group-DEMO-TOPIC-01', nameServer='[127.0.0.1:9876]', topic='DEMO-TOPIC-01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='null', messageModel=CLUSTERING} 2020-02-22 09:32:54.462 INFO 50472 --- [ main] .c.s.b.r.i.RocketMQInboundChannelAdapter : started com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter@1cd3b138友情提示:因為 IDEA 默認同一個程序只允許啟動 1 次,所以我們需要配置 DemoProviderApplication 為?Allow parallel run。如下圖所示:
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:
// ConsumerApplication 控制臺 01 2020-02-22 09:39:29.073 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-1682643477}] 2020-02-22 09:41:32.754 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=1890257867}]// ConsumerApplication 控制臺 02 2020-02-22 09:41:32.264 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:80 消息內容:Demo01Message{id=1401668556}]符合預期。從日志可以看出,每條消息僅被消費一次。
3.4 測試多集群多實例的場景
本小節,我們會在二個消費者集群各啟動兩個實例,測試在集群消費的情況下的表現。
① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。
② 修改?labx-06-sca-stream-rocketmq-consumer-demo?項目的配置文件,修改?spring.cloud.stream.bindings.demo01-input.group?配置項,將消費者分組改成?X-demo01-consumer-group-DEMO-TOPIC-01。
然后,執行?ConsumerApplication 兩次,再啟動兩個消費者的實例,從而實現在消費者分組?X-demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。
③ 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:
// 消費者分組 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 01 2020-02-22 10:17:07.886 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-276398167}] 2020-02-22 10:17:08.237 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-250975158}]// 消費者分組 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 02 2020-02-22 10:17:08.710 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:80 消息內容:Demo01Message{id=412281482}]// 消費者分組 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 01 2020-02-22 10:17:07.887 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:51 消息內容:Demo01Message{id=-276398167}] 2020-02-22 10:17:08.238 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:51 消息內容:Demo01Message{id=-250975158}]// 消費者分組 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 02 2020-02-22 10:17:08.787 INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:77 消息內容:Demo01Message{id=412281482}]符合預期。從日志可以看出,每條消息被每個消費者集群都進行了消費,且僅被消費一次。
3.5 小結
至此,我們已經完成了 Stream RocketMQ 的快速入門,是不是還是蠻簡答的噢。現在胖友可以在回過頭看看 Binder 和 Binding 的概念,是不是就清晰一些了。
4. 定時消息
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-demo
- 消費者:labx-06-sca-stream-rocketmq-consumer-demo
在 RocketMQ 中,提供定時消息的功能。
定時消息,是指消息發到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能被消費。
不過,RocketMQ 暫時不支持任意的時間精度的延遲,而是固化了 18 個延遲級別。如下表格:
| 1 | 1s | 7 | 3m | 13 | 9m |
| 2 | 5s | 8 | 4m | 14 | 10m |
| 3 | 10s | 9 | 5m | 15 | 20m |
| 4 | 30s | 10 | 6m | 16 | 30m |
| 5 | 1m | 11 | 7m | 17 | 1h |
| 6 | 2m | 12 | 8m | 18 | 2h |
如果胖友想要任一時刻的定時消息,可以考慮借助 MySQL + Job 來實現。又或者考慮使用?DDMQ(滴滴打車基于 RocketMQ 和 Kafka 改造的開源消息隊列)。
下面,我們來搭建一個 RocketMQ 定時消息的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,修改?labx-06-sca-stream-rocketmq-producer-demo?發送定時消息,繼續使用?labx-06-sca-stream-rocketmq-consumer-demo?消費消息。
4.1 Demo01Controller
修改?Demo01Controller?類,增發送定時消息的 HTTP 接口。代碼如下:
private Logger logger = LoggerFactory.getLogger(getClass());@GetMapping("/send_delay") public boolean sendDelay() {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // <X> 設置延遲級別為 3,10 秒后消費。.build();// 發送消息boolean sendResult = mySource.demo01Output().send(springMessage);logger.info("[sendDelay][發送消息完成, 結果 = {}]", sendResult);return sendResult; }在?<X>?處,通過添加頭?MessageConst.PROPERTY_DELAY_TIME_LEVEL,設置消息的延遲級別,從而發送定時消息。
4.2 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send_delay?接口,發送延遲 10 秒的定時消息。IDEA 控制臺輸出日志如下:
// Producer 的控制臺 2020-02-22 16:32:35.836 INFO 57143 --- [io-18080-exec-5] c.i.s.l.r.p.controller.Demo01Controller : [sendDelay][發送消息完成, 結果 = true]// Consumer 的控制臺 2020-02-22 16:32:45.841 INFO 57133 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:61 消息內容:Demo01Message{id=618574636}]符合預期。在 Producer 發送的消息之后,Consumer 確實 10 秒后才消費消息。
5. 消費重試
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-demo
- 消費者:labx-06-sca-stream-rocketmq-consumer-retry
RocketMQ 提供消費重試的機制。在消息消費失敗的時候,RocketMQ 會通過消費重試機制,重新投遞該消息給 Consumer ,讓 Consumer 有機會重新消費消息,實現消費成功。
當然,RocketMQ 并不會無限重新投遞消息給 Consumer 重新消費,而是在默認情況下,達到 16 次重試次數時,Consumer 還是消費失敗時,該消息就會進入到死信隊列。
死信隊列用于處理無法被正常消費的消息。當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。
RocketMQ 將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。在 RocketMQ 中,可以通過使用 console 控制臺對死信隊列中的消息進行重發來使得消費者實例再次進行消費。
每條消息的失敗重試,是有一定的間隔時間。實際上,消費重試是基于「5. 定時消息」?來實現,第一次重試消費按照延遲級別為?3?開始。😈 所以,默認為 16 次重試消費,也非常好理解,畢竟延遲級別最高為 18 呀。
不過要注意,只有集群消費模式下,才有消息重試。
下面,我們來搭建一個 RocketMQ 消息重試的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-retry?來模擬消費失敗后的重試。
5.1 復制項目
將「2. 快速入門」小節的?labx-06-sca-stream-rocketmq-consumer-demo,復制出?labx-06-sca-stream-rocketmq-consumer-retry。
5.2 配置文件
修改?application.yml?配置文件,增加消費重試相關的兩個配置項?delay-level-when-next-consume?和?max-attempts。最終配置如下:
spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Consumer 配置項,對應 ConsumerProperties 類consumer:max-attempts: 1# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費delay-level-when-next-consume: 0 # 異步消費消息模式下消費失敗重試策略,默認為 0server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者① 對于?delay-level-when-next-consume?配置項,一共有三種選擇:
- -1:不重復,直接放入死信隊列
- 0:RocketMQ Broker 控制重試策略
-
0:RocketMQ Consumer 控制重試策略
可能胖友對 Broker 和 Consumer 控制重試策略有點懵逼!?每天消息首次消費失敗時,Consumer 會發回給 Broker,并告訴 Broker 按照什么延遲級別開始,不斷重新投遞給 Consumer 直到消費成功或者到達最大延遲級別。
舉個例子,如果這里我們設置了?delay-level-when-next-consume?配置項為 18,則 2 小時后 Broker 會投遞該消息給 Consumer 進行重新消費。
一般情況下,我們設置?delay-level-when-next-consume?配置項為 0 即可,使用 Broker 控制重試策略即可。默認配置下,Broker 會使用延遲級別從 3 開始,10 秒后 Broker 會投遞該消息給 Consumer 進行重新消費。
② 對于?max-attempts?配置項,每次拉取到消息到本地時,如果消費重試,本地重試的最大總次數(包括第一次)。這個是 Spring Cloud Stream 提供的通用消費重試功能,是?Consumer?級別的,而 RocketMQ 提供的獨有消費重試功能,是?Broker?級別的。
因為 Spring Cloud Stream 提供的重試間隔,是通過 sleep 實現,會占掉當前線程,影響 Consumer 的消費速度,所以這里并不推薦使用,因此設置?max-attempts?配置項為 1,禁用 Spring Cloud Stream 提供的重試功能,使用 RocketMQ 提供的重試功能。
友情提示:如果胖友無法保證消費重試不會帶來副作用,也就是說無法保證消費的冪等性,建議關閉消費重試功能,即設置?delay-level-when-next-consume?配置項為 -1,max-attempts?配置項為 1。
5.3 Demo01Consumer
修改?Demo01Consumer?類,在消費消息時拋出異常,從而模擬消費錯誤。代碼如下:
@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// <X> 注意,此處拋出一個 RuntimeException 異常,模擬消費失敗throw new RuntimeException("我就是故意拋出一個異常");}}5.4 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送一條消息。IDEA 控制臺輸出日志如下:
// Demo01Consumer 第一次消費失敗,拋出 RuntimeException 異常 2020-02-22 19:18:52.241 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:18:52.245 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// Demo01Consumer 第一次重試消費失敗,拋出 RuntimeException 異常。間隔了 10 秒,對應延遲級別 3 。 2020-02-22 19:19:02.259 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:19:02.259 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// Demo01Consumer 第二次重試消費失敗,拋出 RuntimeException 異常。間隔了 30 秒,對應延遲級別 4 。 2020-02-22 19:19:32.266 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:19:32.266 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// ... 省略,后續還有重試符合預期。從日志中,我們可以看到,消息因為消費失敗后,又重試消費了多次。
6. 消費異常處理機制
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-demo
- 消費者:labx-06-sca-stream-rocketmq-consumer-error-handler
在 Spring Cloud Stream 中,提供了通用的消費異常處理機制,可以攔截到消費者消費消息時發生的異常,進行自定義的處理邏輯。
下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「5. 消費重試」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-retry?復制出?labx-06-sca-stream-rocketmq-consumer-error-handler?來演示消費異常處理機制。
6.1 復制項目
將「5. 消費重試」小節的?labx-06-sca-stream-rocketmq-consumer-retry,復制出?labx-06-sca-stream-rocketmq-consumer-error-handler。
6.2 Demo01Consumer
修改?Demo01Consumer?類,增加消費異常處理方法。完整代碼如下:
@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT) // 對應 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// <X> 注意,此處拋出一個 RuntimeException 異常,模擬消費失敗throw new RuntimeException("我就是故意拋出一個異常");}@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")public void handleError(ErrorMessage errorMessage) {logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());logger.error("[handleError][headers:{}]", errorMessage.getHeaders());}@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannelpublic void globalHandleError(ErrorMessage errorMessage) {logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());}}① 在 Spring Integration 的設定中,若?#onMessage(@Payload Demo01Message message)?方法消費消息發生異常時,會發送錯誤消息(ErrorMessage)到對應的錯誤 Channel(<destination>.<group>.errors)中。同時,所有錯誤 Channel 都橋接到了 Spring Integration 定義的全局錯誤 Channel(errorChannel)。
友情提示:先暫時記住 Spring Integration 這樣的設定,艿艿也沒去深究 T T,也是一臉懵逼。
因此,我們有兩種方式來實現異常處理:
- 局部的異常處理:通過訂閱指定錯誤 Channel
- 全局的異常處理:通過訂閱全局錯誤 Channel
② 在?#handleError(ErrorMessage errorMessage)?方法上,我們聲明了?@ServiceActivator?注解,訂閱指定錯誤 Channel的錯誤消息,實現?#onMessage(@Payload Demo01Message message)?方法的局部異常處理。如下圖所示:
③ 在?#globalHandleError(ErrorMessage errorMessage)?方法上,我們聲明了?@StreamListener?注解,訂閱全局錯誤 Channel的錯誤消息,實現全局異常處理。
④ 在全局和局部異常處理都定義的情況下,錯誤消息僅會被符合條件的局部錯誤異常處理。如果沒有符合條件的,錯誤消息才會被全局異常處理。
6.3 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送一條消息。IDEA 控制臺輸出日志如下:
// onMessage 方法 2020-02-20 00:47:34.487 INFO 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:60 消息內容:Demo01Message{id=-317670393}]// handleError 方法 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][payload:RuntimeException: 我就是故意拋出一個異常] 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=3, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=0A258102FE8918B4AAC2620411310017, rocketmq_SYS_FLAG=0, id=dc6dafb1-b303-7931-5977-45f319b935d9, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, rocketmq_BORN_TIMESTAMP=1582130833713, timestamp=1582130854444}]] 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][headers:{id=cdf37b5d-878c-3d85-1f40-7711a3642a16, timestamp=1582130854489}]不過要注意,如果異常處理方法成功,沒有重新拋出異常,會認定為該消息被消費成功,所以就不會進行消費重試。
7. 廣播消費
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-demo
- 消費者:labx-06-sca-stream-rocketmq-consumer-broadcasting
在上述的示例中,我們看到的都是使用集群消費,也是最常用的消費模式。而在一些場景下,我們需要使用廣播消費。
廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。
例如說,在應用中,緩存了數據字典等配置表在內存中,可以通過 RocketMQ 廣播消費,實現每個應用節點都消費消息,刷新本地內存的緩存。
又例如說,我們基于 WebSocket 實現了 IM 聊天,在我們給用戶主動發送消息時,因為我們不知道用戶連接的是哪個提供 WebSocket 的應用,所以可以通過 RocketMQ 廣播消費,每個應用判斷當前用戶是否是和自己提供的 WebSocket 服務連接,如果是,則推送消息給用戶。
下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示廣播消費。
7.1 復制項目
將「2. 快速入門」小節的?labx-06-sca-stream-rocketmq-consumer-demo,復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting。
7.2 配置文件
修改?application.yml?配置文件,設置?broadcasting?配置項為?true,開啟廣播消費的模式。完整配置如下:
spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01-X # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: true # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者7.3 簡單測試
① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:
// ConsumerApplication 控制臺 01 2020-02-20 01:20:06.886 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-335590634}] 2020-02-20 01:20:18.368 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=283364059}] 2020-02-20 01:20:24.422 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-1253930234}]// ConsumerApplication 控制臺 02 2020-02-20 01:20:06.884 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=-335590634}] 2020-02-20 01:20:18.368 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=283364059}] 2020-02-20 01:20:24.422 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=-1253930234}]符合預期。從日志可以看出,每條消息僅被每個消費者消費了一次。
8. 順序消息
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-orderly
- 消費者:labx-06-sca-stream-rocketmq-consumer-orderly
RocketMQ 提供了兩種順序級別:
- 普通順序消息:Producer 將相關聯的消息發送到相同的消息隊列。
- 完全嚴格順序:在【普通順序消息】的基礎上,Consumer 嚴格順序消費。
官方文檔是這么描述的:
消息有序,指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。RocketMQ 可以嚴格的保證消息有序。
順序消息分為全局順序消息與分區順序消息,全局順序是指某個 Topic 下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。
- 全局順序:對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
- 分區順序:對于指定的一個 Topic,所有消息根據 Sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。適用場景:性能要求高,以 Sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。
注意,分區順序就是普通順序消息,全局順序就是完全嚴格順序。
下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:
- 從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-orderly?來演示發送順序消息
- 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示順序消費消息
8.1 搭建生產者
從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-orderly?來演示發送順序消息。
8.1.1 配置文件
修改?application.yml?配置文件,添加?partition-key-expression?配置項,設置 Producer 發送順序消息的 Sharding key。完整配置如下:
spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Producer 配置項,對應 ProducerProperties 類producer:partition-key-expression: payload['id'] # 分區 key 表達式。該表達式基于 Spring EL,從消息中獲得分區 key。# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080①?partition-key-expression?配置項,該表達式基于?Spring EL,從消息中獲得 Sharding key。
這里,我們設置該配置項為?payload['id'],表示從 Spring?Message?的 payload 的?id。稍后我們發送的消息的 payload 為 Demo01Message,那么?id?就是?Demo01Message.id。
如果我們想從消息的 headers 中獲得 Sharding key,可以設置為?headers['partitionKey']。
② Spring Cloud Stream 使用?PartitionHandler?進行 Sharding key 的獲得與計算,最終 Sharding key 的結果為?key.hashCode() % partitionCount。
感興趣的胖友,可以閱讀 PartitionHandler 的?#determinePartition(Message<?> message)?方法。
在獲取到 Sharding key 之后,Spring Cloud Alibaba Stream RocketMQ 提供的?PartitionMessageQueueSelector?選擇消息發送的隊列。
我們以發送一條?id?為 1 的 Demo01Message 消息為示例,最終會發送到對應 RocketMQ Topic 的隊列為 1。計算過程如下:
// 第一步,PartitionHandler 使用 `partition-key-expression` 表達式,從 Message 中獲得 Sharding key key => 1// 第二步,PartitionHandler 計算最終的 Sharding key // 默認情況下,每個 RocketMQ Topic 的隊列總數是 4。 key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1// 第三步,PartitionMessageQueueSelector 獲得對應 RocketMQ Topic 的隊列 隊列 => queues.get(key) = queues.get(1)這樣,我們就能保證相同 Sharding Key?的消息,發送到相同的對應 RocketMQ Topic 的隊列中。當前,前提是該 Topic 的隊列總數不能變噢,不然計算的 Sharding Key 會發生變化。
8.1.2 Demo01Controller
修改?Demo01Controller?類,增加發送 3 條順序消息的 HTTP 接口。代碼如下:
@GetMapping("/send_orderly") public boolean sendOrderly() {// 發送 3 條相同 id 的消息int id = new Random().nextInt();for (int i = 0; i < 3; i++) {// 創建 MessageDemo01Message message = new Demo01Message().setId(id);// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();// 發送消息mySource.demo01Output().send(springMessage);}return true; }每次發送的 3 條消息使用相同的?id,配合上我們使用它作為 Sharding key,就可以發送對應 Topic 的相同隊列中。
另外,整列發送的雖然是順序消息,但是和發送普通消息的代碼是一模一樣的。
8.2 搭建消費者
從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示順序消費消息
8.2.1 配置文件
修改?application.yml?配置文件,添加?orderly?配置項,設置 Consumer 順序消費消息。完整配置如下:
spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費orderly: true # 是否順序消費,默認為 false 并發消費。server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者8.2.2 Demo01Consumer
修改?Demo01Consumer?類,在消費消息時,打印出消息所在隊列編號和線程編號,這樣我們通過隊列編號可以判斷消息是否順序發送,通過線程編號可以判斷消息是否順序消費。代碼如下:
@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(Message<?> message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}8.3 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send_orderly?接口,發送順序消息。IDEA 控制臺輸出日志如下:
2020-02-20 21:26:52.044 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212037, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD850006, rocketmq_SYS_FLAG=0, id=945725a1-abfb-218a-d480-b220adff9549, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212044}]] 2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212039, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD870007, rocketmq_SYS_FLAG=0, id=86a0e912-3cba-8b5b-3928-a7ef0ad80036, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]] 2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212041, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD890008, rocketmq_SYS_FLAG=0, id=b04416a3-60c2-bf42-a5a4-fe3c5079cc55, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]]id?為 58569988 的消息被發送到 RocketMQ 消息隊列編號為 0,并且在線程編號為 76 的線程中消費。😈 胖友可以自己在多調用幾次接口,繼續嘗試。
9. 消息過濾
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-demo
- 消費者:labx-06-sca-stream-rocketmq-consumer-filter
RocketMQ 提供了兩種方式給 Consumer 進行消息的過濾:
-
基于 Tag 過濾
標簽(Tag):為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化 RocketMQ 提供的查詢系統。消費者可以根據 Tag 實現對不同子主題的不同消費邏輯,實現更好的擴展性。
-
基于?SQL92?過濾
消息過濾目前是在?Broker?端實現的,優點是減少了 Broker 和 Consumer 之間的無用消息的網絡傳輸,缺點是增加了 Broker 的負擔、而且實現相對復雜。
一般情況下,我們使用 Tag 過濾較多,我們來搭建一個 RocketMQ 使用 Tag 進行消息過濾的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:
- 修改?labx-06-sca-stream-rocketmq-producer-demo?發送帶有 Tag 的消息。
- 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter?來使用 Tag 過濾消息來消費。
先搭建消費者。
9.1 Demo01Controller
修改?Demo01Controller?類,增加發送 3 條帶 Tag 的消息的 HTTP 接口。代碼如下:
@GetMapping("/send_tag") public boolean sendTag() {for (String tag : new String[]{"yunai", "yutou", "tudou"}) {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, tag) // <X> 設置 Tag.build();// 發送消息mySource.demo01Output().send(springMessage);}return true; }在?<X>?處,通過添加頭?MessageConst.PROPERTY_TAGS,設置發送消息的?Tag。
再搭建消費者。
9.2 復制項目
從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter?來使用 Tag 過濾消息來消費。
9.3 配置文件
修改?application.yml?配置文件,設置?tags?配置項為?yunai || yutou,只消費帶有 Tag 為?yunai?或?yutou?的消息。完整配置如下:
spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費tags: yunai || yutou # 基于 Tag 訂閱,多個 Tag 使用 || 分隔,默認為空sql: # 基于 SQL 訂閱,默認為空server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者如果胖友想要基于 SQL92 過濾消息,可以通過設置?sql?配置項。
9.4 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send_tag?接口,發送帶有 Tag 的消息。IDEA 控制臺輸出日志如下:
2020-02-20 22:41:57.639 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:Demo01Message{id=687868446}] 2020-02-20 22:41:57.641 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:Demo01Message{id=1088622557}]只消費了兩條消息,目測 Tag 為?tudou?的消息已經被過濾了。要注意,被過濾掉的消息,后續是無法被消費掉了,效果和消費成功是一樣的。
9.5 Demo01Consumer
咳咳咳:不知道如何取這標題,暫時用這個噶。
上面我們看到的是 RocketMQ?獨有的?Broker級別的消息過濾機制,而 Spring Cloud Stream 提供了通用的?Consumer?級別的效率過濾器機制。我們只需要使用?@StreamListener?注解的?condition?屬性,設置消息滿足指定 Spring EL 表達式的情況下,才進行消費。
| > /** > * A condition that must be met by all items that are dispatched to this method. > * @return a SpEL expression that must evaluate to a {@code boolean} value. > */ > String condition() default ""; > |
修改?Demo01Consumer?類,使用?@StreamListener?注解的?condition?屬性來過濾消息。代碼如下:
@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());// @StreamListener(MySink.DEMO01_INPUT) // public void onMessage(@Payload Demo01Message message) { // logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message); // }@StreamListener(value = MySink.DEMO01_INPUT, condition = "headers['rocketmq_TAGS'] == 'yunai'")public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}這里我們設置消息的 Header 帶有的?rocketmq_TAGS?值為?yunai?時,才進行消費。
9.6 再次測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send_tag?接口,發送帶有 Tag 的消息。IDEA 控制臺輸出日志如下:
/ Tag 為 `yunai` 的消息被消費 2020-02-20 22:59:11.597 INFO 81438 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:64 消息內容:Demo01Message{id=124549390}]// Tag 為 `yutou` 的消息被過濾 2020-02-20 22:59:11.599 WARN 81438 --- [MessageThread_1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: 5edff575-b9a7-e011-154a-532077994685只消費了一條消息,目測 Tag 為?tudou?的消息被 Broker 過濾,Tag 為?yutou?的消息被 Consumer 過濾。要注意,被過濾掉的消息,后續是無法被消費掉了,效果和消費成功是一樣的。
10. 事務消息
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-transaction
- 消費者:labx-06-sca-stream-rocketmq-consumer-demo
在分布式消息隊列中,目前唯一提供完整的事務消息的,只有 RocketMQ 。關于這一點,還是可以鼓吹下的。
可能會有胖友怒噴艿艿,RabbitMQ 和 Kafka 也有事務消息啊,也支持發送事務消息的發送,以及后續的事務消息的 commit提交或 rollbackc 回滾。但是要考慮一個極端的情況,在本地數據庫事務已經提交的時時候,如果因為網絡原因,又或者崩潰等等意外,導致事務消息沒有被 commit ,最終導致這條事務消息丟失,分布式事務出現問題。
相比來說,RocketMQ 提供事務回查機制,如果應用超過一定時長未 commit 或 rollback 這條事務消息,RocketMQ 會主動回查應用,詢問這條事務消息是 commit 還是 rollback ,從而實現事務消息的狀態最終能夠被 commit 或是 rollback ,達到最終事務的一致性。
這也是為什么艿艿在上面專門加粗“完整的”三個字的原因。可能上述的描述,對于絕大多數沒有了解過分布式事務的胖友,會比較陌生,所以推薦閱讀如下兩篇文章:
- 《阿里云消息隊列 MQ —— 事務消息》
- 《芋道 RocketMQ 源碼解析 —— 事務消息》
熱心的艿艿:雖然說 RabbitMQ、Kafka 并未提供完整的事務消息,但是社區里,已經基于它們之上拓展,提供了事務回查的功能。例如說:Myth?,采用消息隊列解決分布式事務的開源框架, 基于 Java 語言來開發(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架進行分布式事務。
下面,我們來搭建一個 RocketMQ 定時消息的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,修改?labx-06-sca-stream-rocketmq-producer-transaction?發送事務消息,繼續使用?labx-06-sca-stream-rocketmq-consumer-demo?消費消息。
10.1 復制項目
從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-transaction?來發送事務消息。
10.2 配置文件
修改?application.yml?配置文件,添加?transactional?配置項為?true,設置 Producer 發送事務消息。完整配置如下:
spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。transactional: true # 是否發送事務消息,默認為 false。server:port: 1808010.3 Demo01Controller
修改?Demo01Controller?類,增加發送事務消息的 HTTP 接口。代碼如下:
@GetMapping("/send_transaction") public boolean sendTransaction() {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Args args = new Args().setArgs1(1).setArgs2("2");Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader("args", JSON.toJSONString(args)) // <X>.build();// 發送消息return mySource.demo01Output().send(springMessage); }public static class Args { // 這里作為示例,所以直接這么寫了private Integer args1;private String args2;// ... 省略 setter、getter、toString 方法 }因為 Spring Cloud Stream 在設計時,并沒有考慮事務消息,所以我們只好在?<X>?處,通過 Header 傳遞參數。
又因為 Header 后續會被轉換成 String 類型,導致我們無法獲得正確的真實的原始參數,所以這里我們先使用 JSON 將?args?參數序列化成字符串,這樣后續我們可以使用 JSON 反序列化回來。
10.4 TransactionListenerImpl
創建?TransactionListenerImpl?類,實現 MQ 事務的監聽。代碼如下:
@RocketMQTransactionListener(txProducerGroup = "test") public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 從消息 Header 中解析到 args 參數,并使用 JSON 反序列化Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),Demo01Controller.Args.class);// ... local transaction process, return rollback, commit or unknownlogger.info("[executeLocalTransaction][執行本地事務,消息:{} args:{}]", msg, args);return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// ... check transaction status and return rollback, commit or unknownlogger.info("[checkLocalTransaction][回查消息:{}]", msg);return RocketMQLocalTransactionState.COMMIT;}}① 在類上,添加?@RocketMQTransactionListener?注解,聲明監聽器的是生產者分組是?"test"?的 Producer 發送的事務消息。因為 RocketMQ 是回查(請求)指定指定生產分組下的 Producer,從而獲得事務消息的狀態,所以一定要正確設置。
② 實現?RocketMQLocalTransactionListener?接口,實現執行本地事務和檢查本地事務的方法。
③ 實現?#executeLocalTransaction(...)?方法,實現執行本地事務。
-
注意,這是一個模板方法。在調用這個方法之前,Spring Cloud Alibaba Stream RocketMQ 已經使用 Producer 發送了一條事務消息。然后根據該方法執行的返回的?RocketMQLocalTransactionState?結果,提交還是回滾該事務消息。
友情提示:感興趣的胖友,可以看看 DefaultMQProducerImpl 的?#sendMessageInTransaction(...)?的源碼,整個模板方法是怎么執行的。
-
😈 這里,我們為了模擬 RocketMQ 回查 Producer 來獲得事務消息的狀態,所以返回了?RocketMQLocalTransactionState.UNKNOWN?未知狀態。
④ 實現?#checkLocalTransaction(...)?方法,檢查本地事務。
- 在事務消息長事件未被提交或回滾時,RocketMQ 會回查事務消息對應的生產者分組下的 Producer ,獲得事務消息的狀態。此時,該方法就會被調用。
- 😈 這里,我們直接返回?RocketMQLocalTransactionState.COMMIT?提交狀態。
一般來說,有兩種方式實現本地事務回查時,返回事務消息的狀態。
第一種,通過?msg?消息,獲得某個業務上的標識或者編號,然后去數據庫中查詢業務記錄,從而判斷該事務消息的狀態是提交還是回滾。
第二種,記錄?msg?的事務編號,與事務狀態到數據庫中。
- 第一步,在?#executeLocalTransaction(...)?方法中,先存儲一條?id?為?msg?的事務編號,狀態為?RocketMQLocalTransactionState.UNKNOWN?的記錄。
- 第二步,調用帶有事務的業務 Service 的方法。在該 Service 方法中,在邏輯都執行成功的情況下,更新?id?為?msg?的事務編號,狀態變更為?RocketMQLocalTransactionState.COMMIT?。這樣,我們就可以伴隨這個事務的提交,更新?id?為?msg?的事務編號的記錄的狀為?RocketMQLocalTransactionState.COMMIT?,美滋滋。。
- 第三步,要以?try-catch?的方式,調用業務 Service 的方法。如此,如果發生異常,回滾事務的時候,可以在?catch?中,更新?id?為?msg?的事務編號的記錄的狀態為?RocketMQLocalTransactionState.ROLLBACK?。😭 極端情況下,可能更新失敗,則打印 error 日志,告警知道,人工介入。
- 如此三步之后,我們在?#executeLocalTransaction(...)?方法中,就可以通過查找數據庫,id?為?msg?的事務編號的記錄的狀態,然后返回。
相比來說,艿艿傾向第二種,實現更加簡單通用,對于業務開發者,更加友好。和有幾個朋友溝通了下,他們也是采用第二種。
10.5 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send_transaction?接口,發送事務消息。IDEA 控制臺輸出日志如下:
// ProduerApplication 控制臺 // ### TransactionListenerImpl 執行 executeLocalTransaction 方法,先執行本地事務的邏輯 2020-02-21 00:14:08.773 INFO 83052 --- [io-18080-exec-1] c.i.s.l.r.p.l.TransactionListenerImpl : [executeLocalTransaction][執行本地事務,消息:GenericMessage [payload=byte[17], headers={args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, id=d8604733-9083-5d19-15b4-bda0c549e9d1, contentType=application/json, timestamp=1582215248772}] args:Args{args1=1, args2='2'}] // ### Producer 發送事務消息成功,但是因為 executeLocalTransaction 方法返回的是 UNKOWN 狀態,所以事務消息并未提交或者回滾 // ### RocketMQ Broker 在發送事務消息 30 秒后,發現事務消息還未提交或是回滾,所以回查 Producer 。此時,checkLocalTransaction 方法返回 COMMIT ,所以該事務消息被提交 2020-02-21 00:14:48.685 INFO 83052 --- [pool-1-thread-1] c.i.s.l.r.p.l.TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1582215248763, args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=0A25810200002A9F000000000002868F, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, rocketmq_SYS_FLAG=0, id=62383992-5015-f957-41e7-75ec5ace4496, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582215288685}]]// ConsumerApplication 控制臺 // ### 事務消息被提交,所以該消息被 Consumer 消費 2020-02-21 00:14:48.756 INFO 83058 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:79 消息內容:Demo01Message{id=1950986029}]整個的執行過程,看看艿艿在日志上添加的說明。
11. 監控端點
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-actuator
- 消費者:labx-06-sca-stream-rocketmq-consumer-actuator
Spring Cloud Stream 的?endpoint?模塊,基于 Spring Boot Actuator,提供了自定義監控端點?bindings?和?channels,用于獲取 Spring Cloud Stream 的 Binding 和 Channel 信息。
同時,Spring Cloud Alibaba Stream RocketMQ 拓展了 Spring Boot Actuator 內置的?health?端點,通過自定義的?RocketMQBinderHealthIndicator,獲取 RocketMQ 客戶端的健康狀態。
友情提示:對 Spring Boot Actuator 不了解的胖友,可以后續閱讀《芋道 Spring Boot 監控端點 Actuator 入門》文章。
我們來搭建一個 Stream RocketMQ 監控端點的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:
- 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-actuator,查看生產者的監控端點結果。
- 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter,查看消費者的監控端點結果。
11.1 搭建生產者
從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-actuator,查看生產者的監控端點結果。
11.1.1 引入依賴
在?pom.xml?文件中,額外引入 Spring Boot Actuator 相關依賴。代碼如下:
<!-- 實現對 Actuator 的自動化配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>11.1.2 配置文件
修改?application.yaml?配置文件,額外增加 Spring Boot Actuator 配置項。配置如下:
management:endpoints:web:exposure:include: '*' # 需要開放的端點。默認值只打開 health 和 info 兩個端點。通過設置 * ,可以開放所有端點。endpoint:# Health 端點配置項,對應 HealthProperties 配置類health:enabled: true # 是否開啟。默認為 true 開啟。show-details: ALWAYS # 何時顯示完整的健康信息。默認為 NEVER 都不展示。可選 WHEN_AUTHORIZED 當經過授權的用戶;可選 ALWAYS 總是展示。每個配置項的作用,胖友看下艿艿添加的注釋。如果還不理解的話,后續看下《芋道 Spring Boot 監控端點 Actuator 入門》文章。
11.1.3 簡單測試
① 使用 ProducerApplication 啟動生產者。
② 訪問應用的?bindings?監控端點?http://127.0.0.1:18080/actuator/bindings,返回結果如下圖:
③ 訪問應用的?channels?監控端點?http://127.0.0.1:18080/actuator/channels,返回結果如下圖:
④ 訪問應用的?health?監控端點?http://127.0.0.1:18080/actuator/health,返回結果如下圖:
11.2 搭建消費者
從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter,查看消費者的監控端點結果。
11.2.1 引入依賴
在?pom.xml?文件中,額外引入 Spring Boot Actuator 相關依賴。代碼如下:
<!-- 實現對 Actuator 的自動化配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>11.2.2 配置文件
修改?application.yaml?配置文件,額外增加 Spring Boot Actuator 配置項。配置如下:
management:endpoints:web:exposure:include: '*' # 需要開放的端點。默認值只打開 health 和 info 兩個端點。通過設置 * ,可以開放所有端點。endpoint:# Health 端點配置項,對應 HealthProperties 配置類health:enabled: true # 是否開啟。默認為 true 開啟。show-details: ALWAYS # 何時顯示完整的健康信息。默認為 NEVER 都不展示。可選 WHEN_AUTHORIZED 當經過授權的用戶;可選 ALWAYS 總是展示。每個配置項的作用,胖友看下艿艿添加的注釋。如果還不理解的話,后續看下《芋道 Spring Boot 監控端點 Actuator 入門》文章。
112.3 簡單測試
① 使用 ConsumerApplication 啟動消費者,隨機端口為 19541。
② 訪問應用的?bindings?監控端點?http://127.0.0.1:19541/actuator/bindings,返回結果如下圖:
③ 訪問應用的?channels?監控端點?http://127.0.0.1:19541/actuator/channels,返回結果如下圖:
④ 訪問應用的?health?監控端點?http://127.0.0.1:19541/actuator/health,返回結果如下圖:
12. 更多的配置項信息
Spring Cloud Alibaba Stream RocketMQ 提供的配置項挺多的,我們參考文檔將配置項一起梳理下。
RocketMQ Binder Properties
以?spring.cloud.stream.rocketmq.binder?為前綴。
| name-server | RocketMQ NameServer 地址 | 127.0.0.1:9876 |
| access-key | 阿里云賬號 AccessKey | |
| secret-key | 阿里云賬號 SecretKey | |
| enable-msg-trace | 是否為 Producer 和 Consumer 開啟消息軌跡功能 | true |
| customized-trace-topic | 消息軌跡開啟后存儲的 Topic 名稱 | RMQ_SYS_TRACE_TOPIC |
RocketMQ Consumer Properties
以?spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.?為前綴。
| enable | 是否啟用 Consumer | true |
| tags | Consumer 基于 TAGS 訂閱,多個 tag 以?||?分割 | |
| sql | Consumer 基于 SQL 訂閱 | |
| broadcasting | 是Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式 | false |
| orderly | Consumer 是否同步消費消息模式 | false |
| delayLevelWhenNextConsume | 異步消費消息模式下消費失敗重試策略:-1, 不重復,直接放入死信隊列;0, Broker 控制重試策略;>0, Client 控制重試策略 | 0 |
| suspendCurrentQueueTimeMillis | 同步消費消息模式下消費失敗后再次消費的時間間隔 | 1000 |
RocketMQ Provider Properties
| enable | 是否啟用 Producer | true |
| group | Producer 分組 | |
| maxMessageSize | 消息發送的最大字節數 | 8249344 |
| transactional | 是否發送事務消息 | false |
| sync | 是否使用同步得方式發送消息 | false |
| vipChannelEnabled | 是否在 Vip Channel 上發送消息 | true |
| sendMessageTimeout | 發送消息的超時時間(毫秒) | 3000 |
| compressMessageBodyThreshold | 消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮) | 4096 |
| retryTimesWhenSendFailed | 在同步發送消息的模式下,消息發送失敗的重試次數 | 2 |
| retryTimesWhenSendAsyncFailed | 在異步發送消息的模式下,消息發送失敗的重試次數 | 2 |
| retryNextServer | 消息發送失敗的情況下是否重試其它的 Broker | false |
13.接入阿里云的消息隊列 RocketMQ
示例代碼對應倉庫:
- 生產者:labx-06-sca-stream-rocketmq-producer-aliyun
- 消費者:labx-06-sca-stream-rocketmq-consumer-aliyun
在阿里云上,提供消息隊列?RocketMQ?服務。那么,我們是否能夠使用 Spring Cloud Alibaba Stream RocketMQ 實現阿里云 RocketMQ 的消息的發送與消費呢?
答案是可以。在?《阿里云 —— 消息隊列 MQ —— 開源 Java SDK 接入說明》?中,提到目前開源的 Java SDK 可以接入阿里云 RocketMQ 服務。
如果您已使用開源 Java SDK 進行生產,只需參考方法,重新配置參數,即可實現無縫上云。
前提條件
- 已在阿里云 MQ 控制臺創建資源,包括 Topic、Group ID(GID)、接入點(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
- 已下載開源 RocketMQ 4.5.1 或以上版本,以支持連接阿里云 MQ。
我們來搭建一個 Stream RocketMQ 監控端點的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:
- 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為生產者。
- 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為消費者。
13.1 搭建生產者
從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為生產者。
修改?application.yaml?配置文件,添加?access-key、secret-key?配置項,設置訪問阿里云 RocketMQ 的賬號。完全配置如下:
spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: TOPIC_YUNAI_TEST # 目的地。這里使用 RocketMQ Topic <ALIYUN>content-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>access-key: ${ALIYUN_ACCESS_KEY} # 阿里云賬號 AccessKeysecret-key: ${ALIYUN_SECRET_KEY} # 阿里云賬號 SecretKey# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: GID_PRODUCER_GROUP_YUNAI_TEST # 生產者分組 <ALIYUN>sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080注意,<ALIYUN>?處的三個配置項,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Producer Group。
13.2 搭建消費者
從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為消費者。
修改?application.yaml?配置文件,添加?access-key、secret-key?配置項,設置訪問阿里云 RocketMQ 的賬號。完全配置如下:
spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: TOPIC_YUNAI_TEST # 目的地。這里使用 RocketMQ Topic <ALIYUN>content-type: application/json # 內容格式。這里使用 JSONgroup: GID_PRODUCER_GROUP_YUNAI_TEST # 消費者分組 <ALIYUN># Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>access-key: ${ALIYUN_ACCESS_KEY} # 阿里云賬號 AccessKeysecret-key: ${ALIYUN_SECRET_KEY} # 阿里云賬號 SecretKey# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者注意,<ALIYUN>?處的三個配置項,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Consumer Group。
13.3 簡單測試
① 執行?ConsumerApplication,啟動消費者的實例。
② 執行?ProducerApplication,啟動生產者的實例。
之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送消息。IDEA 控制臺輸出日志如下:
// ConsumerApplication 控制臺 2020-02-21 01:45:16.982 INFO 85901 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:89 消息內容:Demo01Message{id=-724066118}]總結
以上是生活随笔為你收集整理的Spring Cloud Alibaba RocketMQ 快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华为设备路由策略原理与实验
- 下一篇: .NET中代理服务器WebProxy的各