rabbitMq工作模式特性及整合springboot
因為公司項目后面需要用到mq做數據的同步,所以學習mq并在此記錄,這里的是rabbitMq
mq(message queue)消息隊列
官網:www.rabbitmq.com 使用消息隊列的優點:1、異步可加快訪問速度 (以前一個訂單接口需要做下單、庫存、付款、快遞等相關操作,有了mq只需要給相關信息傳入隊列,下單、庫存、付款、快遞等相關操作會自動從隊列中收到信息進行異步操作)2、解耦下游服務或其他服務或語言可接入3、削峰高并發訪問量可分攤多個隊列分攤 缺點:1、系統可用性降低(一旦mq掛了系統就宕機了)2、系統復雜性增大 (增加了mq模塊需要考慮更多)RabbitMQ的高級特性
- 消費端限流
- TTL 全稱time to live(存活時間/過期時間) - 當消息到達存活時間后還沒被消費會被丟棄 ttl+死信隊列可實現延遲隊列效果
- 死信隊列
- 延遲隊列
- 消息可靠性投遞
- Consumer ACK
rabbitMq為了確保消息投遞的可靠性提供了兩種方式 confirm和return
rabbitmq整個消息投遞的路徑為 producer--->rabbitmq broker--->exchange--->queue--->consumer 1.消息從producer到exchange則會返回一個confirmCallback. 2.消息從exchange到queue投遞失敗則會返回一個returnCallBack. 我們將利用這兩個callback控制消息的可靠性投遞Consumer ACK
ack指acknowledge,確認。表示消費者端接收到消息后的確認方式 有三種方式確認:自動確認:acknowledge="none"手動確認:acknowledge="manual"根據異常情況確認:acknowledge="auto"自動確認指,當消息一旦被消費者接收到,則自動確認收到,并將相應的message從mq的消息緩存中移除。 但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。 如果設置了手動確認模式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常, 則調用channel.basicNack()方法,讓其自動重新發送消息。我這里學習了前面五種
1:簡單模式
2:工作隊列模式
3:發布訂閱模式
4:路由模式
5:主題模式
簡單模式:即一條線一個發送到隊列,隊列發送到接收者
工作隊列模式:即有一個發送者發送信息到隊列,隊列發給多個接收者,比如群發
發布訂閱模式:這個是使用的最多的,發布者需要先發送到交換機,交換機再發送到與之綁定的隊列, 然后隊列在發送到與之綁定隊列的接收者
路由模式:路由模式在發布訂閱上增加了條件篩選,在消息到達交換機后發送隊列時進行條件匹配,匹配成功才能發送給對應綁定的隊列,最后再發送給接收者
主題模式:主題模式在路由模式上面進行升級,條件可進行模糊匹配,通配符規則 #可以匹配多個詞 * 只能匹配一個詞 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two
先安裝rabbitMq,不同的環境可安裝相關的版本,我這里已經安裝好了
然后運行sbin下面的rabbitmq-server.bat
然后網頁localhost:15672,如下頁面即安裝成功
然后去rabbitmq的官網
左邊是下載右邊是文檔
文檔中也會有一些代碼案例,點擊文檔可以看到mq有七種方式
第一個是在測試的時候需要引入的包,第二個是在springboot上需要引入的包
com.rabbitmq
amqp-client
5.3.0
一:簡單模式
我給mq的連接封裝在工具類里,一些隊列名放在常量類里了
工具類代碼:
常量類代碼:
package com.lansi.realtynavi.test.constant;/*** @Description 描述* @Date 2021/3/23 11:01* @Created by huyao*/ public class MqConstant {public static final String MQ_HELLO_WORD = "helloWord";public static final String MQ_PUBLISH = "publish";public static final String MQ_ROUTING = "routing";public static final String MQ_TOPICS = "topics";public static final String MQ_WORK_QUEUES = "workQueues";public static final String MQ_QUEUE_BAIDU = "baidu";public static final String MQ_QUEUE_XINLANG = "xinlang";public static final String MQ_PUBLISH_JHJ = "jiaohuanji";public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";}生產者代碼
package com.lansi.realtynavi.test.helloWord;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;/*** @Description 簡單模式* @Date 2021/3/22 17:19* @Created by huyao*/ public class Producer {public static void main(String[] args) throws Exception{Channel channel = null;Connection connection = null;try {//獲取長連接connection = RabbitUtils.getConnection();channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);String message = "這是我發送的第三個隊列消息";//第一個參數是交換機信息 簡單隊列不需要交換機 第二個參數隊列名稱 ,第三個額外信息,第四個需要發布的信息channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());System.out.println("[x] Send ‘" + message + "’");}catch (Exception e){e.printStackTrace();}finally {channel.close();connection.close();}}}消費者代碼:
package com.lansi.realtynavi.test.helloWord;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/22 17:27* @Created by huyao*/ public class Consumer {public static void main(String[] argv) throws Exception {//連接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//聲明并創建一個隊列//參數1 隊列ID//參數2 是否持久化,false對應不持久化數據,mq停掉數據就會丟失//參數3 是否隊列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用//參數4 是否自動刪除, false代表連接停掉后不自動刪除這個隊列// 其他額外的參數,nullchannel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);//從MQ服務器中獲取數據//創建一個消息消費者//參數1:隊列ID//參數2:代表是否自動確認收到消息,false代表手動編程來確認消息,這是mq的推薦做法//參數3:參數要傳入的DefaultConsumer的實現類channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));} }class Reciver extends DefaultConsumer {private Channel channel;//重寫構造函數,Channel通道對象需要從外層傳入,在handleDelivery中用到public Reciver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("消費者接收到的消息:"+message);System.out.println("消息的ID:"+envelope.getDeliveryTag());//false只確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息channel.basicAck(envelope.getDeliveryTag(), false);} }測試的時候隊列需要手動去創建,不過springboot的話可以自動創建
這里已經手動創建好了
運行接收者,運行啟動者
這里接收者自動接收消息
二:工作隊列模式
一個隊列多個接收者生產者代碼:
package com.lansi.realtynavi.test.workQueues;import com.google.gson.Gson; import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;/*** @Description 工作隊列模式* @Date 2021/3/22 17:33* @Created by huyao*/ public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);for(int i = 1; i<=20; i++){SMS sms = new SMS("乘客" + i, "123456789", "你的車票已預訂成功");String message = new Gson().toJson(sms);channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());}System.out.println("發送數據成功");channel.close();connection.close();}}封裝對象代碼:
package com.lansi.realtynavi.test.workQueues;/*** @Description 描述* @Date 2021/3/23 11:28* @Created by huyao*/ public class SMS {private String name;private String mobile;private String content;public SMS(String name, String mobile, String content) {this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;} }三個接收者代碼
接收者1
package com.lansi.realtynavi.test.workQueues;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/23 11:33* @Created by huyao*/ public class Consumer1 {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);//如果不寫baiscQos(1) 則自動mq會將所有請求平均發送給所有消費者//baiscQos,mq不再對消費者一次發送多個請求,而是消費者處理完一個消息后(確認后),再從隊列中獲取一個新的channel.basicQos(1);//處理完一個取一個channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("smsConsumer1-短信發送成功:"+message);//服務器好的話可以在這里睡眠 這里可動態配置開啟和設置睡眠時間/*try {Thread.sleep(10);}catch (Exception e){e.printStackTrace();}*/channel.basicAck(envelope.getDeliveryTag(), false);}});} }接收者2
package com.lansi.realtynavi.test.workQueues;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/23 11:40* @Created by huyao*/ public class Consumer2 {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("smsConsumer2-短信發送成功:"+message);channel.basicAck(envelope.getDeliveryTag(), false);}});}}接收者3
package com.lansi.realtynavi.test.workQueues;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/23 11:41* @Created by huyao*/ public class Consumer3 {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("smsConsumer1-短信發送成功:"+message);channel.basicAck(envelope.getDeliveryTag(), false);}});}}啟動三個接收類,啟動發送類
三:發布訂閱模式
生成者代碼:
這里和前面兩種模式不同,發送者綁定了交換機,沒用綁定隊列,需要消費者綁定交換機和隊列
package com.lansi.realtynavi.test.publish;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.util.Scanner;/*** @Description 發布訂閱模式* @Date 2021/3/23 13:31* @Created by huyao*/ public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);String input = new Scanner(System.in).next();//第一個參數交換機名字,其他參數和之前一樣channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());channel.close();connection.close();} }接收者1代碼:
package com.lansi.realtynavi.test.publish;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消費者* @Date 2021/3/23 13:50* @Created by huyao*/ public class ConsumerXinLang {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);//隊列綁定交換機//參數1:隊列名,參數2:交換機名,參數3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者新浪收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}接收者2代碼:
package com.lansi.realtynavi.test.publish;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消費者* @Date 2021/3/23 13:50* @Created by huyao*/ public class ConsumerBaiDu {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);//隊列綁定交換機 目前交換機需要在rabbit也手動創建,在和spring整合的時候spring會自動幫我們創建//參數1:隊列名,參數2:交換機名,參數3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者百度收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}啟動生產者消費者,在生產者控制臺輸入信息:
兩個消費者都接收到了
四 路由模式
路由模式發送需要攜帶路由key,用作接收者進行判斷生產者代碼:
package com.lansi.realtynavi.test.routing;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map;/*** @Description 路由模式* @Date 2021/3/23 13:31* @Created by huyao*** 交換機類型:fanout廣播(發布訂閱) direct轉發(路由) topic通配符(通配模式)**/ public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);LinkedHashMap<String, String> map = new LinkedHashMap<>();map.put("test1","測試一數據");map.put("test2","測試二數據");map.put("test3","測試三數據");map.put("test4","測試四數據");map.put("test5","測試五數據");map.put("test6","測試六數據");map.put("test7","測試七數據");Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();while (iterator.hasNext()){Map.Entry<String, String> next = iterator.next();//第一個參數交換機名字,第二個參數指定rout_keychannel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());}channel.close();connection.close();}}接收者1:
package com.lansi.realtynavi.test.routing;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消費者* @Date 2021/3/23 13:50* @Created by huyao*/ public class ConsumerBaiDu {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);//隊列綁定交換機 目前交換機需要在rabbit也手動創建,在和spring整合的時候spring會自動幫我們創建//參數1:隊列名,參數2:交換機名,參數3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者百度收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}接收者二
package com.lansi.realtynavi.test.routing;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消費者* @Date 2021/3/23 13:50* @Created by huyao*/ public class ConsumerXinLang {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);//隊列綁定交換機//參數1:隊列名,參數2:交換機名,參數3:路由keychannel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者新浪收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}
在這里看到百度接收者只接受test1、test2,所以只接收到了1和2的數據,新浪同理
五 主題模式
在路由的基礎上增加了通配符匹配通配符規則 #可以匹配多個詞 * 只能匹配一個詞生產者代碼:
package com.lansi.realtynavi.test.topics;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map;/*** @Description 通配符模式* @Date 2021/3/23 13:31* @Created by huyao*** 交換機類型:fanout廣播(發布訂閱) direct轉發(路由) topic通配符(通配模式)** 通配符規則 #可以匹配多個詞 * 只能匹配一個詞* test.# test.one.tow test.one.q.wqe / test.* test.one test.two*/ public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, false, false, false, null);LinkedHashMap<String, String> map = new LinkedHashMap<>();map.put("test.one","測試一數據");map.put("test2.two.one","測試二數據");map.put("test.wqe","測試三數據");map.put("test4.com.hash.oqp","測試四數據");map.put("test5.com.code.oqp","測試五數據");map.put("test6.com.code.oqp","測試六數據");Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();while (iterator.hasNext()){Map.Entry<String, String> next = iterator.next();//第一個參數交換機名字,第二個參數指定rout_keychannel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());}channel.close();connection.close();}}接收者1代碼:
package com.lansi.realtynavi.test.topics;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消費者* @Date 2021/3/23 13:50* @Created by huyao*/ public class ConsumerBaiDu {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);//隊列綁定交換機 目前交換機需要在rabbit也手動創建,在和spring整合的時候spring會自動幫我們創建//參數1:隊列名,參數2:交換機名,參數3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者百度收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}接收者2代碼
package com.lansi.realtynavi.test.topics;import com.lansi.realtynavi.test.constant.MqConstant; import com.lansi.realtynavi.test.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消費者* @Date 2021/3/23 13:50* @Created by huyao*/ public class ConsumerXinLang {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);//隊列綁定交換機//參數1:隊列名,參數2:交換機名,參數3:路由keychannel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者新浪收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}
最后就是springboot上整合rabbitmq
需要用到的依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency> 然后配置rabbitmq連接 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #發送者開啟confirm確認機制 spring.rabbitmq.publisher-confirms=true #發送者開啟return確認機制 spring.rabbitmq.publisher-returns=true#開啟ackspring.rabbitmq.listener.type=simple spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.default-requeue-rejected=false接下來一個rabbitmq的配置
package com.lansi.realtynavi.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @Description mq的配置* @Date 2021/3/24 14:19* @Created by huyao*/ @Configuration public class RabbitMqConfig {//定義交換機的名字public static final String EXCHANGE_NAME = "boot_topic_exchange";public static final String QUEUE_NAME = "boot_queue";//1.聲明交換機@Bean("bootExchange")public Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.聲明隊列@Bean("bootQueue")public Queue bootQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}//3.綁定@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();} }接收者
package com.lansi.realtynavi.config;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** @Description mq監聽/消費者手動簽收消息* @Date 2021/3/24 14:44* @Created by huyao**rabbitmq給了兩種消息的可靠性 confirm和return**/ @Component public class RabbitMqConsumer {//可監聽分布式其他項目,只要mq連接的地址相同監聽的隊列名存在即可//消費者@RabbitListener(queues = "boot_queue")public void ListenerQueue(Message message, Channel channel) throws Exception{System.out.println("消費者接收到消息:"+new String(message.getBody()));try{//開始業務處理System.out.println("開始業務處理");//int i = 5/0;System.out.println("業務處理完成");//業務處理完成確認收到消息 , 第二個參數為true支持多消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}catch (Exception e){System.out.println("業務處理異常");//業務異常,拒收消息,請求重發 參數三為true則重回隊列發送channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);}}}這里的生產者我寫的一個controller中的列子(錯誤示范,只能調用一次)
testTopic1 是測試mq的高級特性,這里只用到testTopic就可以
package com.lansi.realtynavi.rabbitmq;import com.lansi.realtynavi.config.RabbitMqConfig; import com.lansi.realtynavi.dev.helloWord.HelloSender; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;/*** @Description 描述* @Date 2021/3/24 13:46* @Created by huyao*/ @RestController @RequestMapping("api/rabbitMq") public class RabbitMqController {@Autowiredprivate HelloSender helloSender;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("helloWorld")public void hello(){helloSender.send();}@GetMapping("testTopic")public void testTopic(){rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh", "topic的mq.......");}//mq的可靠性機制,必須要在配置文件中開啟@GetMapping("testTopic1")public void testTopic1(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被執行了。。。");if(b){System.out.println("交換機確認成功!!");} else {System.out.println("交換機確認失敗!!");}}});//設置交換機處理失敗消息的模式,為true的時候,消息打到不了隊列時,會將消息重新返回給生產者rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** @param message 消息對象* @param returnCode 錯誤碼* @param returnText 錯誤信息* @param exchange 交換機* @param routingKey 路由鍵** */@Overridepublic void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {System.out.println("return被執行了。。。");System.out.println("message:"+new String(message.getBody()));System.out.println("錯誤碼:"+returnCode);System.out.println("錯誤信息:"+returnText);System.out.println("交換機:"+exchange);System.out.println("路由鍵:"+routingKey);}});rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh", "topic的mq.......");}}運行后掉對應的接口,消費者接收
這樣rabbitmq就整合進springboot中了
總結
以上是生活随笔為你收集整理的rabbitMq工作模式特性及整合springboot的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 路由器交换与配置综合实验(二)外网
- 下一篇: Sentinel