javascript
springboot整合rocketmq_面试官:简单说一下RocketMQ整合SpringBoot吧
前言
在使用SpringBoot的starter集成包時,要特別注意版本。因為SpringBoot集成RocketMQ的starter依賴是由Spring社區提供的,目前正在快速迭代的過程當中,不同版本之間的差距非常大,甚至基礎的底層對象都會經常有改動。例如如果使用rocketmq-spring-boot-starter:2.0.4版本開發的代碼,升級到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了
應用結構
TestController: 測試入口, 有基本消息測試和事務消息測試
TopicListener: 是監聽"topic"這個主題的普通消息監聽器
TopicTransactionListener: 是監聽"topic"這個主題的事務消息監聽器, 和TopicTransactionRocketMQTemplate綁定(一一對應關系)
Customer: 是測試消息體的一個entity對象
TopicTransactionRocketMQTemplate: 是擴展自RocketMQTemplate的另一個RocketMQTemplate, 專門用來處理某一個業務流程, 和TopicTransactionListener綁定(一一對應關系)
pom.xml
org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1, 引用的springboot版本是2.0.5.RELEASE
<?xml version="1.0" encoding="UTF-8"?>4.0.0com.mrathena.middle.ware rocket.mq.springboot 1.0.0org.springframework.boot spring-boot-dependencies 2.4.0pomimportorg.projectlombok lombok 1.18.12org.slf4j slf4j-api 1.7.30ch.qos.logback logback-classic 1.2.3org.apache.rocketmq rocketmq-spring-boot-starter 2.1.1org.springframework.boot spring-boot-starter org.springframework spring-core org.springframework spring-webmvc org.springframework spring-aop org.springframework spring-context org.springframework spring-messaging com.fasterxml.jackson.core jackson-databind org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test org.springframework spring-messaging com.fasterxml.jackson.core jackson-databind io.springfox springfox-swagger-ui 2.9.2io.springfox springfox-swagger2 2.9.2org.apache.maven.plugins maven-compiler-plugin 3.8.11.81.8UTF-8application.yml
server: servlet: context-path: port: 80rocketmq: name-server: 116.62.162.48:9876 producer: group: producerCustomer
package com.mrathena.rocket.mq.entity;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class Customer {private String username;private String nickname;}生產者 TestController
package com.mrathena.rocket.mq.controller;import com.mrathena.rocket.mq.configuration.TopicTransactionRocketMQTemplate;import com.mrathena.rocket.mq.entity.Customer;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.core.MessagePostProcessor;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;import java.util.Map;@Slf4j@RestController@RequestMapping("test")public class TestController {private static final String TOPIC = "topic";@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate TopicTransactionRocketMQTemplate topicTransactionRocketMQTemplate;@GetMapping("base")public Object base() {// destination: topic/topic:tag, topic或者是topic拼接tag的整合體// payload: 荷載即消息體// message: org.springframework.messaging.Message, 是Spring自己封裝的類, 和RocketMQ的Message不是一個類, 里面沒有tags/keys等內容rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload("你好").setHeader("你是誰", "你猜").build());// tags nullrocketMQTemplate.convertAndSend(TOPIC, "tag null");// tags empty, 證明 tag 要么有值要么null, 不存在 empty 的 tagrocketMQTemplate.convertAndSend(TOPIC + ":", "tag empty ?");// 只有 tag 沒有 keyrocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a");rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b");// 有 property, 即 RocketMQ 基礎 API 里面, Message(String topic, String tags, String keys, byte[] body) 里面的 key// rocketmq-spring-boot-starter 把 userProperty 和其他的一些屬性都糅合在 headers 里面可, 具體可以參考 org.apache.rocketmq.spring.support.RocketMQUtil.addUserProperties// 獲取某個自定義的屬性的時候, 直接 headers.get("自定義屬性key") 就可以了Map properties = new HashMap<>();properties.put("property", 1);properties.put("another-property", "你好");rocketMQTemplate.convertAndSend(TOPIC, "property 1", properties);rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 1", properties);rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b property 1", properties);properties.put("property", 5);rocketMQTemplate.convertAndSend(TOPIC, "property 5", properties);rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 5", properties);rocketMQTemplate.convertAndSend(TOPIC + ":c", "tag c property 5", properties);// 消息后置處理器, 可以在發送前對消息體和headers再做一波操作rocketMQTemplate.convertAndSend(TOPIC, "消息后置處理器", new MessagePostProcessor() {/** * org.springframework.messaging.Message */@Overridepublic Message> postProcessMessage(Message> message) {Object payload = message.getPayload();MessageHeaders messageHeaders = message.getHeaders();return message;}});// convertAndSend 底層其實也是 syncSend// syncSendlog.info("{}", rocketMQTemplate.syncSend(TOPIC, "sync send"));// asyncSendrocketMQTemplate.asyncSend(TOPIC, "async send", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("onSuccess");}@Overridepublic void onException(Throwable e) {log.info("onException");}});// sendOneWayrocketMQTemplate.sendOneWay(TOPIC, "send one way");// 這個我還是不太清楚是干嘛的? 跑的時候會報錯!!!//Object receive = rocketMQTemplate.sendAndReceive(TOPIC, "你好", String.class);//log.info("{}", receive);return "success";}@GetMapping("transaction")public Object transaction() {Message message = MessageBuilder.withPayload(new Customer("mrathena", "你是誰")).build();// 這里使用的是通過 @ExtRocketMQTemplateConfiguration(group = "anotherProducer") 擴展出來的另一個 RocketMQTemplatelog.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC, message, null));log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC + ":tag-a", message, null));return "success";}}配置 TopicTransactionRocketMQTemplate
package com.mrathena.rocket.mq.configuration;import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;import org.apache.rocketmq.spring.core.RocketMQTemplate;/** * 一個事務流程和一個RocketMQTemplate需要一一對應 * 可以通過 @ExtRocketMQTemplateConfiguration(注意該注解有@Component注解) 來擴展多個 RocketMQTemplate * 注意: 不同事務流程的RocketMQTemplate的producerGroup不能相同 * 因為MQBroker會反向調用同一個producerGroup下的某個checkLocalTransactionState方法, 不同流程使用相同的producerGroup的話, 方法可能會調用錯 */@ExtRocketMQTemplateConfiguration(group = "anotherProducer")public class TopicTransactionRocketMQTemplate extends RocketMQTemplate {}消費者 TopicListener
package com.mrathena.rocket.mq.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * 最簡單的消費者例子 * topic: 主題 * consumerGroup: 消費者組 * selectorType: 過濾方式, TAG:標簽過濾,僅支持標簽, SQL92:SQL過濾,支持標簽和屬性 * selectorExpression: 過濾表達式, 根據selectorType定, TAG時, 寫標簽如 "a || b", SQL92時, 寫SQL表達式 * consumeMode: CONCURRENTLY:并發消費, ORDERLY:順序消費 * messageModel: CLUSTERING:集群競爭消費, BROADCASTING:廣播消費 */@Slf4j@Component@RocketMQMessageListener(topic = "topic",// 只過濾tag, 不管headers中的key和value//selectorType = SelectorType.TAG,// 必須指定selectorExpression, 可以過濾tag和headers中的key和value//selectorType = SelectorType.SQL92,// 不限tag//selectorExpression = "*",// 不限tag, 和 * 一致//selectorExpression = "",// 只要tag為a的消息//selectorExpression = "a",// 要tag為a或b的消息//selectorExpression = "a || b",// SelectorType.SQL92時, 可以跳過tag, 直接用headers里面的key和value來判斷//selectorExpression = "property = 1",// tag不為null//selectorExpression = "TAGS is not null",// tag為empty, 證明tag不會是empty, 要么有值要么null//selectorExpression = "TAGS = ''",// SelectorType.SQL92時, 即過濾tag, 又過濾headers里面的key和value//selectorExpression = "(TAGS is not null and TAGS = 'a') and (property is not null and property between 4 and 6)",// 并發消費consumeMode = ConsumeMode.CONCURRENTLY,// 順序消費//consumeMode = ConsumeMode.ORDERLY,// 集群消費messageModel = MessageModel.CLUSTERING,// 廣播消費//messageModel = MessageModel.BROADCASTING,consumerGroup = "consumer")public class TopicListener implements RocketMQListener {public void onMessage(String s) {log.info("{}", s);}}消費者 TopicTransactionListener
package com.mrathena.rocket.mq.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.stereotype.Component;@Slf4j@Component@RocketMQTransactionListener(rocketMQTemplateBeanName = "topicTransactionRocketMQTemplate")public class TopicTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {// message: org.springframework.messaging.Message, 是Spring自己封裝的類, 和RocketMQ的Message不是一個類, 里面沒有tags/keys等內容// 一般來說, 并不會在這里處理tags/keys等內容, 而是根據消息體中的某些字段做不同的操作, 第二個參數也可以用來傳遞一些數據到這里log.info("executeLocalTransaction message:{}, object:{}", message, o);log.info("payload: {}", new String((byte[]) message.getPayload()));MessageHeaders headers = message.getHeaders();log.info("tags: {}", headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));log.info("rocketmq_TOPIC: {}", headers.get("rocketmq_TOPIC"));log.info("rocketmq_QUEUE_ID: {}", headers.get("rocketmq_QUEUE_ID"));log.info("rocketmq_MESSAGE_ID: {}", headers.get("rocketmq_MESSAGE_ID"));log.info("rocketmq_TRANSACTION_ID: {}", headers.get("rocketmq_TRANSACTION_ID"));log.info("TRANSACTION_CHECK_TIMES: {}", headers.get("TRANSACTION_CHECK_TIMES"));log.info("id: {}", headers.get("id"));return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {log.info("checkLocalTransaction message:{}", message);// 在調用了checkLocalTransaction后, 另一個常規消息監聽器才能收到消息return RocketMQLocalTransactionState.COMMIT;}}最后
歡迎關注小編后,可以私信小編【666】即可領取一線大廠Java面試題總結+各知識點學習思維導+一份300頁pdf文檔的Java核心知識點總結!
總結
以上是生活随笔為你收集整理的springboot整合rocketmq_面试官:简单说一下RocketMQ整合SpringBoot吧的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pandas尾部添加一条_Numpy与P
- 下一篇: python与机械教育初探_Python