javascript
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
一、RocketMQ
1、架構圖片
2、角色分類
(1)、Broker
RocketMQ 的核心,接收 Producer 發過來的消息、處理 Consumer 的消費消息請求、消息的持 久化存儲、服務端過濾功能等 。
(2)、NameServer
消息隊列中的狀態服務器,集群的各個組件通過它來了解全局的信息 。類似微服務中注冊中心的服務注冊,發現,下線,上線的概念。
熱備份:
NamServer可以部署多個,相互之間獨立,其他角色同時向多個NameServer 機器上報狀態信息。
心跳機制:
NameServer 中的 Broker、 Topic等狀態信息不會持久存儲,都是由各個角色定時上報并存儲到內存中,超時不上報的話, NameServer會認為某個機器出故障不可用。
(3)、Producer
消息的生成者,最常用的producer類就是DefaultMQProducer。
(4)、Consumer
消息的消費者,常用Consumer類
DefaultMQPushConsumer
收到消息后自動調用傳入的處理方法來處理,實時性高
DefaultMQPullConsumer
用戶自主控制 ,靈活性更高。
3、通信機制
(1)、Broker啟動后需要完成一次將自己注冊至NameServer的操作;隨后每隔30s時間定時向NameServer更新Topic路由信息。
(2)、Producer發送消息時候,需要根據消息的Topic從本地緩存的獲取路由信息。如果沒有則更新路由信息會從NameServer重新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息。
(3)、Consumer消費消息時候,從NameServer獲取的路由信息,并再完成客戶端的負載均衡后,監聽指定消息隊列獲取消息并進行消費。
二、代碼實現案例
1、項目結構圖
版本描述
<spring-boot.version>2.1.3.RELEASE</spring-boot.version> <rocketmq.version>4.3.0</rocketmq.version>2、配置文件
rocketmq:# 生產者配置producer:isOnOff: on# 發送同一類消息的設置為同一個group,保證唯一groupName: CicadaGroup# 服務地址namesrvAddr: 127.0.0.1:9876# 消息最大長度 默認1024*4(4M)maxMessageSize: 4096# 發送消息超時時間,默認3000sendMsgTimeout: 3000# 發送消息失敗重試次數,默認2retryTimesWhenSendFailed: 2# 消費者配置consumer:isOnOff: on# 官方建議:確保同一組中的每個消費者訂閱相同的主題。groupName: CicadaGroup# 服務地址namesrvAddr: 127.0.0.1:9876# 接收該 Topic 下所有 Tagtopics: CicadaTopic~*;consumeThreadMin: 20consumeThreadMax: 64# 設置一次消費消息的條數,默認為1條consumeMessageBatchMaxSize: 1# 配置 Group Topic Tag rocket:group: rocketGrouptopic: rocketTopictag: rocketTag3、生產者配置
/*** RocketMQ 生產者配置*/ @Configuration public class ProducerConfig {private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;@Value("${rocketmq.producer.groupName}")private String groupName;@Value("${rocketmq.producer.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.maxMessageSize}")private Integer maxMessageSize ;@Value("${rocketmq.producer.sendMsgTimeout}")private Integer sendMsgTimeout;@Value("${rocketmq.producer.retryTimesWhenSendFailed}")private Integer retryTimesWhenSendFailed;@Beanpublic DefaultMQProducer getRocketMQProducer() {DefaultMQProducer producer;producer = new DefaultMQProducer(this.groupName);producer.setNamesrvAddr(this.namesrvAddr);//如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceNameif(this.maxMessageSize!=null){producer.setMaxMessageSize(this.maxMessageSize);}if(this.sendMsgTimeout!=null){producer.setSendMsgTimeout(this.sendMsgTimeout);}//如果發送消息失敗,設置重試次數,默認為2次if(this.retryTimesWhenSendFailed!=null){producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);}try {producer.start();} catch (MQClientException e) {e.printStackTrace();}return producer;} }4、消費者配置
/*** RocketMQ 消費者配置*/ @Configuration public class ConsumerConfig {private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;@Value("${rocketmq.consumer.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.groupName}")private String groupName;@Value("${rocketmq.consumer.consumeThreadMin}")private int consumeThreadMin;@Value("${rocketmq.consumer.consumeThreadMax}")private int consumeThreadMax;@Value("${rocketmq.consumer.topics}")private String topics;@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")private int consumeMessageBatchMaxSize;@Resourceprivate RocketMsgListener msgListener;@Beanpublic DefaultMQPushConsumer getRocketMQConsumer(){DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.registerMessageListener(msgListener);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);try {String[] topicTagsArr = topics.split(";");for (String topicTags : topicTagsArr) {String[] topicTag = topicTags.split("~");consumer.subscribe(topicTag[0],topicTag[1]);}consumer.start();}catch (MQClientException e){e.printStackTrace();}return consumer;} }5、消息監聽配置
/*** 消息消費監聽*/ @Component public class RocketMsgListener implements MessageListenerConcurrently {private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;@Resourceprivate ParamConfigService paramConfigService ;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isEmpty(list)){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);LOG.info("接受到的消息為:"+new String(messageExt.getBody()));int reConsume = messageExt.getReconsumeTimes();// 消息已經重試了3次,如果不需要再次消費,則返回成功if(reConsume ==3){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){String tags = messageExt.getTags() ;switch (tags){case "rocketTag":LOG.info("開戶 tag == >>"+tags);break ;default:LOG.info("未匹配到Tag == >>"+tags);break;}}// 消息消費成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }6、配置參數綁定
@Service public class ParamConfigService {@Value("${rocket.group}")public String rocketGroup ;@Value("${rocket.topic}")public String rocketTopic ;@Value("${rocket.tag}")public String rocketTag ; }7、消息發送測試
@Service public class RocketMqServiceImpl implements RocketMqService {@Resourceprivate DefaultMQProducer defaultMQProducer;@Resourceprivate ParamConfigService paramConfigService ;@Overridepublic SendResult openAccountMsg(String msgInfo) {// 可以不使用Config中的GroupdefaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);SendResult sendResult = null;try {Message sendMsg = new Message(paramConfigService.rocketTopic,paramConfigService.rocketTag,"open_account_key", msgInfo.getBytes());sendResult = defaultMQProducer.send(sendMsg);} catch (Exception e) {e.printStackTrace();}return sendResult ;} }三、項目源碼
GitHub·地址 https://github.com/cicadasmile/middle-ware-parent GitEE·地址 https://gitee.com/cicadasmile/middle-ware-parent總結
以上是生活随笔為你收集整理的SpringBoot2.0 整合 RocketMQ ,实现请求异步处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux 搭建Sphinx 全文检索引
- 下一篇: 设计模式:结构型模式总结