(需求实战_进阶_06)SSM集成RabbitMQ 订阅模式 关键代码讲解、开发、测试
背景:
為了減輕服務器的壓力,現在原有項目的基礎上集成消息隊列來異步處理消息!
此項目是企業真實需求,項目的代碼屬于線上生產代碼,直接用于生產即可!
此項目采用MQ發送消息模式為:訂閱模式,如果對RabbitMQ不熟悉,請學習RabbitMQ專欄進行相關知識點的學習!遇到問題,可以給我留言!看到后定會回復!
文章目錄
- 一、RabbitMQ 訂閱模式快速入門
- 1. RabbitMQ 訂閱模式簡述
- 2. RabbitMQ 訂閱模式圖示
- 3. MQ角色組成
- 二、SSM集成RabbitMQ
- 2.1. 引依賴
- 2.2. 生產者配置文件
- 2.3. 消費者配置文件
- 2.4. 連接配置文件
- 2.5. 生產者代碼
- 2.6 消費者 ①
- 2.7 消費者 ②
- 2.8 MQ工具類
- 三、啟動項目驗證
- 3.1. 啟動tomcat7插件
- 3.2. 清空控制臺
- 四、管控臺隊列綁定交換機
- 4.1. 復制隊列名稱
- 4.2. 隊列綁定交換機
- 五、請求驗證測試
- 5.1. 生產者①請求
- 4.5. 生產者②請求
- 五、啟動RabbitMQ
- 5.1. 進入sbin目錄,雙擊運行
- 5.2. 啟動圖示
一、RabbitMQ 訂閱模式快速入門
1. RabbitMQ 訂閱模式簡述
什么是發布/訂閱模式(Publish/Subscribe)
簡單解釋就是,可以將消息發送給不同類型的消費者。做到發布一次,消費多個。
重要知識點:訂閱模式不走路由routingKey,消息隊列只綁定交換機。
2. RabbitMQ 訂閱模式圖示
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機,到達隊列,實現,一個消息被多個消費者獲取的目的
3. MQ角色組成
| ① | 生產者 | PRODUCER |
| ② | 消費者 | CONSUMER |
| ③ | 消息隊列 | QUEUE |
| ④ | 交換機 | EXCHANGE |
| ⑤ | 交換機和隊列綁定 | EXCHANGE和ROUTINGKEY綁定 |
二、SSM集成RabbitMQ
2.1. 引依賴
<!--spring整合rabbitmq--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.4.0.RELEASE</version></dependency>2.2. 生產者配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--生產者者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/><!-- 管理消息隊列 --><rabbit:admin connection-factory="connectionFactory"/><!--此處為配置文件方式 管控臺配置模式需要注釋 默認模式管控臺 Start--><!-- 定義一個隊列或者多個隊列 自動聲明--><!-- <rabbit:queue name="ORDER-CATEGORY-GBLFY-QUEUE" auto-declare="true" durable="true"/><rabbit:queue name="ORDER-USER-MENU-QUEUE" auto-declare="true" durable="true"/><rabbit:fanout-exchange name="ORDER-TRACE-EXCHANGE"><rabbit:bindings><!– 可綁定多個隊列,發送的時候指定key進行發送 –><rabbit:binding queue="ORDER-CATEGORY-GBLFY-QUEUE"/><rabbit:binding queue="ORDER-USER-MENU-QUEUE"/></rabbit:bindings></rabbit:fanout-exchange>--><!--此處為配置文件方式 管控臺配置模式需要注釋 默認模式管控臺 End--><!-- 定義交換機 自動聲明--><rabbit:fanout-exchange name="ORDER-TRACE-EXCHANGE"auto-declare="true" durable="true"/><!-- 定義MQ消息模板 --><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory" exchange="ORDER-TRACE-EXCHANGE"/> </beans>2.3. 消費者配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--消費者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/><!-- 管理消息隊列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 定義一個隊列或者多個隊列 --><rabbit:queue name="ORDER-USER-MENU-QUEUE" auto-declare="true" durable="true"/><rabbit:queue name="ORDER-CATEGORY-GBLFY-QUEUE" auto-declare="true" durable="true"/><!-- 聲明多個消費者對象 --><bean id="mQSimpleMsgHandler" class="com.gblfy.order.mqhandler.MQSimpleMsgHandler"/><bean id="orderMQMonitorMsgHandler" class="com.gblfy.order.mqhandler.OrderMQMonitorMsgHandler"/><!-- 監聽隊列 --><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="mQSimpleMsgHandler" method="execute" queue-names="ORDER-CATEGORY-GBLFY-QUEUE"/><rabbit:listener ref="orderMQMonitorMsgHandler" method="execute" queue-names="ORDER-USER-MENU-QUEUE"/></rabbit:listener-container> </beans>2.4. 連接配置文件
rabbitmq.host=127.0.0.1 rabbitmq.port=5672 rabbitmq.username=admin rabbitmq.password=admin rabbitmq.vhost=/admin2.5. 生產者代碼
package com.gblfy.order.controller;import com.gblfy.order.utils.MQSendMsgUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody;import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date;@Controller @Slf4j public class FanoutMQMsgSendController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;@RequestMapping(value = "/snedUserMQMsg", method = RequestMethod.GET)@ResponseBodypublic String snedUserMQMsg() {/*** 模擬發送數據* 1. serviceName 接口名稱* 2. type 路由routingKey*/String serviceName = "my name serviceName";String type = "user";//發送消息到MQ的交換機,通知其他系統mqSendMsgUtils.sendMsg(serviceName, type);return "snedUserMQMsg success !!!";}@RequestMapping(value = "/sendMenuMQMsg", method = RequestMethod.GET)@ResponseBodypublic String sendMenuMQMsg() {/*** 模擬發送數據* 1. serviceName 接口名稱* 2. type 路由routingKey*/String serviceName = "my name serviceName";String type = "menu";//發送消息到MQ的交換機,通知其他系統mqSendMsgUtils.sendMsg(serviceName, type);return "sendMenuMQMsg success !!!";}@RequestMapping(value = "/snedCategoryMQMsg", method = RequestMethod.GET)@ResponseBodypublic String snedCategoryMQMsg() {/*** 模擬發送數據* 1. serviceName 接口名稱* 2. type 路由routingKey*/String serviceName = "my name serviceName2";String type = "category.gblfy";//發送消息到MQ的交換機,通知其他系統mqSendMsgUtils.sendMsg(serviceName, type);return "snedCategoryMQMsg success !!!";}public static void main(String[] args) {DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(dateFormat.format(new Date()));} }2.6 消費者 ①
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j;@Slf4j public class MQSimpleMsgHandler {private static final ObjectMapper MAPPER = new ObjectMapper();/*** 接收MQ消息** @param msg*/public void execute(String msg) {try {JsonNode jsonNode = MAPPER.readTree(msg);String serviceName = jsonNode.get("serviceName").asText();String routingKey = jsonNode.get("routingKey").asText();String currentDate = jsonNode.get("currentDate").asText();log.info("接口名稱:" + serviceName);log.info("路由routingKey:" + routingKey);log.info("當前時間:" + currentDate);} catch (Exception e) {e.printStackTrace();}} }2.7 消費者 ②
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j;@Slf4j public class OrderMQMonitorMsgHandler {private static final ObjectMapper MAPPER = new ObjectMapper();/*** 接收MQ消息** @param msg*/public void execute(String msg) {try {JsonNode jsonNode = MAPPER.readTree(msg);String serviceName = jsonNode.get("serviceName").asText();String routingKey = jsonNode.get("routingKey").asText();String currentDate = jsonNode.get("currentDate").asText();log.info("接口名稱:" + serviceName);log.info("路由routingKey:" + routingKey);log.info("當前時間:" + currentDate);} catch (Exception e) {e.printStackTrace();}} }2.8 MQ工具類
package com.gblfy.order.utils;import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.order.pojo.FisCallingTrace; import com.gblfy.order.pojo.RequestInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map;/*** MQ發送消息公用工具類* <p>* MQ發送消息模式采用 通配符模式* order.* 區配一個詞* order.# 區配一個或者多個詞* <p>** @author gblfy*/ @Component @Slf4j public class MQSendMsgUtils {//格式化時間public static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");// 日期格式//引入json工具類private static final ObjectMapper MAPPER = new ObjectMapper();@Autowired//注入發送消息模板private RabbitTemplate rabbitTemplate;/*** 發送MQ消息公用類** @param mFisCallingTrace 軌跡保存的數據對象* @param type 路由routingKey 字符串拼接* @param reqXml 請求報文* @param resXml 響應報文* @param uuid 隨機生成的uuid*/public void sendMsg(FisCallingTrace mFisCallingTrace, String type, String reqXml, String resXml, String uuid) {try {RequestInfo requestInfo = new RequestInfo().builder().fisCallingTrace(mFisCallingTrace).mReqXml(reqXml).mResXml(resXml).mUUID(uuid).serviceName(mFisCallingTrace.getServicename()).type(type).build();//發送消息到MQ的交換機,通知其他系統String jsonStr = JSON.toJSONString(requestInfo);rabbitTemplate.convertAndSend("order." + type, jsonStr);} catch (Exception e) {e.printStackTrace();}}/*** 發送MQ消息公用類** @param serviceName 接口名稱* @param type 路由routingKey*/public void sendMsg(String serviceName, String type) {try {//發送消息到MQ的交換機,通知其他系統Map<String, Object> msg = new HashMap<String, Object>();msg.put("serviceName", serviceName);msg.put("routingKey", type);msg.put("currentDate", dateFormat.format(new Date()));rabbitTemplate.convertAndSend("order." + type, MAPPER.writeValueAsString(msg));} catch (Exception e) {e.printStackTrace();}}// public static void main(String[] args) { // //使用fastjson 實體類對象轉jsonStr // User ly = new User().builder() // .id(1) // .name("ly") // .build(); // String jsonStr = JSON.toJSONString(ly); // log.info("轉換后jsonStr的用戶:" + jsonStr); // // //使用fastjson 進行jsonObject轉實體類對象 // String userString = "{\"id\":1,\"name\":\"ly\"}"; // // JSONObject userJson = JSONObject.parseObject(userString); // User user = JSON.toJavaObject(userJson, User.class); // // log.info("用戶姓名:" + user.getName()); // log.info("用戶ID:" + user.getId()); // } }三、啟動項目驗證
3.1. 啟動tomcat7插件
3.2. 清空控制臺
四、管控臺隊列綁定交換機
4.1. 復制隊列名稱
4.2. 隊列綁定交換機
點擊聲明的交換機
配置文件中聲明的交換機
在交換機中綁定【ORDER-CATEGORY-GBLFY-QUEUE】隊列
在交換機中綁定【ORDER-CATEGORY-GBLFY-QUEUE】隊列
在交換機中綁定【ORDER-USER-MENU-QUEUE】隊列
在交換機菜單中查看,綁定的隊列
在隊列菜單中查看,綁定的交換機
五、請求驗證測試
5.1. 生產者①請求
http://localhost:8888/snedUserMQMsg
4.5. 生產者②請求
http://localhost:8888/sendMenuMQMsg
五、啟動RabbitMQ
5.1. 進入sbin目錄,雙擊運行
5.2. 啟動圖示
總結
以上是生活随笔為你收集整理的(需求实战_进阶_06)SSM集成RabbitMQ 订阅模式 关键代码讲解、开发、测试的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBoot集成flowable
- 下一篇: (vue基础试炼_03)使用vue.js