當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot2.x RabbitMQ Nacos Nacos-Config
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot2.x RabbitMQ Nacos Nacos-Config
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
文章目錄
- 一、依賴配置
- 1. 引入依賴
- 2. 配置文件
- 3. 主配置
- 二、生產者代碼代碼Conding
- 2.1. 發(fā)送客戶端
- 2.2. 確認機制
- 2.3. 消息 return機制
- 2.4. controller
- 2.5. MQ工具類
- 2.6. 常量類
- 三、消費端
- 3.2. 消費者代碼
- 3.2. RabbitMQ常用命令
一、依賴配置
1. 引入依賴
<!--服務注冊發(fā)現(xiàn)--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- SpringCloud Alibaba Nacos Config --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!--springboot整合RabbitMQ依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2. 配置文件
項目內部配置bootstrap.yml,
server:port: 8001 spring:application:# 應用名稱name: ly-rabbitmqprofiles:# 環(huán)境配置active: devcloud:nacos:discovery:# 服務注冊地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}nacos-config服務端配置
在這里插入代碼片3. 主配置
package com.gblfy.lyrabbitmq.config;import com.gblfy.lyrabbitmq.consts.MQPrefixConst; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** RabbitMQ 交換機和隊列綁定配置類** @author gblfy* @Date 2021-09-28 9:59*/ @Configuration public class RabbitTopicConfig {@BeanTopicExchange topicExchange() {return new TopicExchange(MQPrefixConst.WS_EXCEHANGE, true, false);}@BeanQueue hisUQ() {return new Queue("ly_mq_fai_q");}@BeanBinding hisUQBinding() {//ly-his.# 代表路由規(guī)則 表示如果路由的 routingKey 是以ly-his 開頭就會發(fā)送到 ly_mq_his_u_q 這個隊列上return BindingBuilder.bind(hisUQ()).to(topicExchange()).with("ly-fai.#");} }二、生產者代碼代碼Conding
2.1. 發(fā)送客戶端
package com.gblfy.lyrabbitmq.provider;import com.alibaba.fastjson.JSON; import com.gblfy.common.entity.Order; import com.gblfy.lyrabbitmq.consts.MQPrefixConst; import com.gblfy.lyrabbitmq.utils.MQSendMsgUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.time.LocalDateTime;/*** RabbitMQ發(fā)送消息** @author gblfy* @Date 2021-09-28 9:59*/ @Service public class RabbitMQProvider {private final static Logger log = LoggerFactory.getLogger(RabbitMQProvider.class);@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 發(fā)送補償MQ消息** @param orderId 訂單編碼* @param orderNum 訂單數(shù)量* @param createTime 訂單時間* @return 發(fā)送標識*/public void sendMQContent(long orderId, String orderNum, LocalDateTime createTime) {Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 發(fā)送MQ消息到交換機通過指定消息路由key路由到指定隊列中mqSendMsgUtils.snedStrMQMsg(MQPrefixConst.LY_MQ_FAI_QUERY, JSON.toJSONString(order));log.info("MQ消息發(fā)送成功");} }2.2. 確認機制
package com.gblfy.lyrabbitmq.confirms;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;/*** Confirm 消息確認機制* 消息發(fā)送成功和失敗都會記錄** @author gblfy* @Date 2021-09-28 9:59*/ @Component("confirmCallback") public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {//日志輸出private final static Logger log = LoggerFactory.getLogger(ConfirmCallBackListener.class);/*** 生產者消息發(fā)送成功與失敗確認機制* <p>* 1. ack* true : 標志生產者將消息發(fā)出成功* false: 標志生產者將消息發(fā)出失敗* 2. ack :true 意味著消息發(fā)送成功 有2種場景* 第一種:生產者將消息成功發(fā)送到指定隊列中,等待消費者消費消息* 第兩種:生產者將消息發(fā)送成功,但是,由于無法路由到指定的消息* 隊列,這種場景的消息,會被return機制監(jiān)聽到,后續(xù)進行補償機制,做消息補發(fā)處理* </p>** @param correlationData 隊列消息的唯一標識ID,消息做補償機制會用到* @param ack ack 消息是否發(fā)送成功的標識* @param cause 消息發(fā)送失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息隊列標識ID: {}", correlationData.getId());/***對消息發(fā)送成功/失敗狀態(tài)做不同的處理* 1. 第一種場景 發(fā)送消息成功* 1>發(fā)送消息成功,交換機路由隊列成功* 2>發(fā)送消息成功,交換機路由隊列不成功* 2. 發(fā)送消息失敗*/if (ack) {log.info("發(fā)送消息成功: {}", ack);} else {log.info("發(fā)送消息失敗: {}", ack);}} } /*** !ack 場景結果示例:* <p>* correlationData: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568]* ack: false* 異常處理....* 消息: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568],* nack,失敗原因是:channel error;* protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'FIS-TRACE-COMMON-EXCHANGE' in vhost '/admin',* class-id=60, method-id=40)*/2.3. 消息 return機制
package com.gblfy.lyrabbitmq.returns;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;/*** 消息 return機制* 路由失敗的消息會先走這,然后到ConfirmCallBackListener記錄異常錯誤信息** @author gblfy* @Date 2021-09-28 9:59*/ @Component("returnCallBackListener") public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {//打印日志 實時定位private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);/*** 消息無法路由 觸發(fā)消息 return機制* <p>* 1. 消費者在消息沒有被路由到合適隊列情況下會被return監(jiān)聽,而不會自動刪除* 2. 會監(jiān)聽到生產者發(fā)送消息的關鍵信息* 3. 根據(jù)關鍵信息,后續(xù)進行補償機制,做消息補發(fā)處理* </p>** @param message 消息實體* @param replyCode 應答碼312* @param replyText NO_ROUTE* @param exchange 交換機* @param routingKey 路由routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());log.info("ContentType: {}", message.getMessageProperties().getContentType());log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());log.info("消息發(fā)送的指定交換機: {}", exchange);log.info("隊列路由的routingKey: {}", routingKey);log.info("隊列的響應碼replyCode: {}", replyCode);log.info("隊列的響應信息: {}", replyText);//TODO 消息發(fā)送交換機成功 路由失敗 保存軌跡記錄} } /*** 場景結果示例:* return exchange: FIS-TRACE-COMMON-EXCHANGE, routingKey: fis-str.user, replyCode: 312, replyText: NO_ROUTE* correlationData: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a]* ack: true* 消息: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a],已經被ack成功*/2.4. controller
package com.gblfy.lyrabbitmq.controller;import com.gblfy.lyrabbitmq.provider.RabbitMQProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController @RequestMapping("/mq") public class MQProviderController {@Autowiredprivate RabbitMQProvider mqProvider;@GetMapping("/sendMQ")public String sendMQContent() {mqProvider.sendMQContent(0001, "10", LocalDateTime.now());return "OK";} }2.5. MQ工具類
package com.gblfy.lyrabbitmq.utils;import com.gblfy.lyrabbitmq.confirms.ConfirmCallBackListener; import com.gblfy.lyrabbitmq.consts.MQPrefixConst; import com.gblfy.lyrabbitmq.returns.ReturnCallBackListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.UUID;/*** MQ發(fā)送消息(公用工具類)** @author gblfy* @Date 2021-09-28 9:59*/ @Component public class MQSendMsgUtils {private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);@Autowired//注入發(fā)送消息模板private RabbitTemplate rabbitTemplate;@Autowiredprivate ConfirmCallBackListener confirmCallback;@Autowiredprivate ReturnCallBackListener returnCallback;/*** 發(fā)送MQ STRING類型消息 第1種** @param queueRouteKey 路由routingKey* @param msg MQ STRING類型消息*/public void snedStrMQMsg(String queueRouteKey, String msg) {try {log.info("交換機名稱: {}, 路由routingKey: {}, 發(fā)送的消息: {} ", "EXCHANGE-CMIIP", queueRouteKey, msg);String mID = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(mID);// Confirm 消息確認策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息確認策略rabbitTemplate.setReturnCallback(returnCallback);log.info("發(fā)送消息的路由key: {}", queueRouteKey);log.info("發(fā)送消息的標識ID: {}", mID);//發(fā)送消息到MQ的交換機,通知其他系統(tǒng)rabbitTemplate.convertAndSend(MQPrefixConst.WS_EXCEHANGE, queueRouteKey, msg, correlationId);} catch (Exception e) {e.printStackTrace();}} }2.6. 常量類
package com.gblfy.lyrabbitmq.consts;/*** 消息路由規(guī)則前綴(常量類)** @author gblfy* @Date 2021-09-28 9:59*/ public class MQPrefixConst {//交換機名稱//回歸環(huán)境public static final String WS_EXCEHANGE = "LY-REPORT-EXCHANGE";// 路由keypublic static final String LY_MQ_FAI_QUERY = "ly-fai.query"; }三、消費端
3.2. 消費者代碼
package com.gblfy.lyrabbitmq.consumer;import com.gblfy.lyrabbitmq.consts.MQPrefixConst; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;/*** RabbitMQ消費端處理** @author gblfy* @Date 2021-09-28 9:59*/ @Component public class RabbitMQHandler implements ChannelAwareMessageListener {//打印日志 實時定位private final static Logger log = LoggerFactory.getLogger(RabbitMQHandler.class);/*** 接收字符串類型MQ消息** @param message* @param channel* @throws Exception*/@Override@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.str1.queue.name}",durable = "${spring.rabbitmq.listener.str1.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.str1.exchange.name}",durable = "${spring.rabbitmq.listener.str1.exchange.durable}",type = "${spring.rabbitmq.listener.str1.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str1.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.str1.key}"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 創(chuàng)建一個消費端軌跡表來存儲消息的軌跡數(shù)據(jù)String jsonMsg = new String(message.getBody());log.info("響應報文 mResXml: {}", jsonMsg);// 同一時刻服務器只會發(fā)一條消息給消費者channel.basicQos(1);// 反饋消息的消費狀態(tài)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 反饋消息的消費狀態(tài)System.err.println("--------------------------------------");//------------------------------根據(jù)約定解析指定的標簽--------------------------------------------// JSONObject jsonObject = new JSONObject();// jsonObject = JSON.parseObject(jsonMsg);// String msgID = jsonObject.getString("msgID");// log.info("接收的消息ID: {}", msgID);//// String tResXml = jsonObject.getString("tResXml");// log.info("解析后的zip路徑: {}", tResXml);String queueRouteKey = message.getMessageProperties().getReceivedRoutingKey();log.info("接收的路由key: {}", queueRouteKey);if (MQPrefixConst.LY_MQ_FAI_QUERY.equals(queueRouteKey)) {//TODO 監(jiān)聽查詢接口邏輯//TODO 保存數(shù)據(jù)到數(shù)據(jù)庫} else {log.error("無此路由key: {}", queueRouteKey);}} }3.2. RabbitMQ常用命令
# 啟動MQ rabbitmq-server -detatched總結
以上是生活随笔為你收集整理的SpringBoot2.x RabbitMQ Nacos Nacos-Config的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker rabbitmq:3.9.
- 下一篇: @Transactional注解导致 多