RabbitMQ六种队列模式-发布订阅模式
前言
RabbitMQ六種隊(duì)列模式-簡(jiǎn)單隊(duì)列
RabbitMQ六種隊(duì)列模式-工作隊(duì)列
RabbitMQ六種隊(duì)列模式-發(fā)布訂閱 [本文]
RabbitMQ六種隊(duì)列模式-路由模式
RabbitMQ六種隊(duì)列模式-主題模式
上文的工作隊(duì)列模式是直接在生產(chǎn)者與消費(fèi)者里聲明好一個(gè)隊(duì)列,這種情況下消息只會(huì)對(duì)應(yīng)同類型的消費(fèi)者。
顯然這種只處理同種類型的消息是有弊端的。
舉個(gè)用戶注冊(cè)的列子
門戶網(wǎng)站,用戶在注冊(cè)完后一般都會(huì)發(fā)送消息通知用戶注冊(cè)成功(失敗)。
如果在一個(gè)系統(tǒng)中,用戶注冊(cè)信息有郵箱、手機(jī)號(hào),那么在注冊(cè)完后會(huì)向郵箱和手機(jī)號(hào)都發(fā)送注冊(cè)完成信息(假設(shè)都發(fā)送)。
利用 MQ 實(shí)現(xiàn)業(yè)務(wù)異步處理,如果是用工作隊(duì)列的話,就會(huì)聲明一個(gè)注冊(cè)信息隊(duì)列。注冊(cè)完成之后生產(chǎn)者會(huì)向隊(duì)列提交一條注冊(cè)數(shù)據(jù),消費(fèi)者取出數(shù)據(jù)同時(shí)向郵箱以及手機(jī)號(hào)發(fā)送兩條消息。但是實(shí)際上郵箱和手機(jī)號(hào)信息發(fā)送實(shí)際上是不同的業(yè)務(wù)邏輯,不應(yīng)該放在一塊處理。
這個(gè)時(shí)候就可以利用發(fā)布/訂閱模式將消息發(fā)送到轉(zhuǎn)換機(jī)(EXCHANGE),聲明兩個(gè)不同的隊(duì)列(郵箱、手機(jī)),并綁定到交換機(jī)。這樣生產(chǎn)者只需要發(fā)布一次消息,兩個(gè)隊(duì)列都會(huì)接收到消息發(fā)給對(duì)應(yīng)的消費(fèi)者,大致如下圖所示。
在應(yīng)用中,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。
文章目錄
文章目錄
- 前言
- 舉個(gè)用戶注冊(cè)的列子
- 文章目錄
- 1. 什么是發(fā)布訂閱模式
- 2. 代碼部分
- 2.1 生產(chǎn)者
- 2.2 郵件消費(fèi)者
- 2.3 短信消費(fèi)者
- 2.4 運(yùn)行截圖
- 3. 總結(jié)
1. 什么是發(fā)布訂閱模式
簡(jiǎn)單解釋就是,可以將消息發(fā)送給不同類型的消費(fèi)者。做到發(fā)布一次,消費(fèi)多個(gè)。下圖取自于官方網(wǎng)站(RabbitMQ)的發(fā)布/訂閱模式的圖例:
P 表示為生產(chǎn)者、 X 表示交換機(jī)、C1C2 表示為消費(fèi)者,紅色表示隊(duì)列。
下面代碼部分會(huì)展示郵件、短信的例子,通過綁定到一個(gè)交換機(jī),但是
2. 代碼部分
2.1 生產(chǎn)者
public class ProducerFanout {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {/** 1.創(chuàng)建新的連接 */Connection connection = MQConnectionUtils.newConnection();/** 2.創(chuàng)建通道 */Channel channel = connection.createChannel();/** 3.綁定的交換機(jī) 參數(shù)1交互機(jī)名稱 參數(shù)2 exchange類型 */channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/** 4.發(fā)送消息 */for (int i = 0; i < 10; i++){String message = "用戶注冊(cè)消息:" + i;System.out.println("[send]:" + message);//發(fā)送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));try {Thread.sleep(5 * i);} catch (InterruptedException e) {e.printStackTrace();}}/** 5.關(guān)閉通道、連接 */channel.close();connection.close();/** 注意:如果消費(fèi)沒有綁定交換機(jī)和隊(duì)列,則消息會(huì)丟失 */}}代碼補(bǔ)充,channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes(“utf-8”)); 其中第二個(gè)參數(shù)為空類似于表示全局廣播,只要綁定到該隊(duì)列上的消費(fèi)者理論上是都可以收到的。
2.2 郵件消費(fèi)者
public class ConsumerEmailFanout {private static final String QUEUE_NAME = "consumerFanout_email";private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("郵件消費(fèi)者啟動(dòng)");/* 1.創(chuàng)建新的連接 */Connection connection = MQConnectionUtils.newConnection();/* 2.創(chuàng)建通道 */Channel channel = connection.createChannel();/* 3.消費(fèi)者關(guān)聯(lián)隊(duì)列 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/* 4.消費(fèi)者綁定交換機(jī) 參數(shù)1 隊(duì)列 參數(shù)2交換機(jī) 參數(shù)3 routingKey */channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "UTF-8");System.out.println("消費(fèi)者獲取生產(chǎn)者消息:" + msg);}};/* 5.消費(fèi)者監(jiān)聽隊(duì)列消息 */channel.basicConsume(QUEUE_NAME, true, consumer);}}代碼補(bǔ)充, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”); 中第三個(gè)參數(shù)置為空時(shí),可以接收到生產(chǎn)者所有的消息(生產(chǎn)者 routingKey 參數(shù)為空時(shí))。
2.3 短信消費(fèi)者
public class ConsumerSMSFanout {private static final String QUEUE_NAME = "ConsumerFanout_sms";private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("短信消費(fèi)者啟動(dòng)");/* 1.創(chuàng)建新的連接 */Connection connection = MQConnectionUtils.newConnection();/* 2.創(chuàng)建通道 */Channel channel = connection.createChannel();/* 3.消費(fèi)者關(guān)聯(lián)隊(duì)列 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/* 4.消費(fèi)者綁定交換機(jī) 參數(shù)1 隊(duì)列 參數(shù)2交換機(jī) 參數(shù)3 routingKey */channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "UTF-8");System.out.println("消費(fèi)者獲取生產(chǎn)者消息:" + msg);}};/* 5.消費(fèi)者監(jiān)聽隊(duì)列消息 */channel.basicConsume(QUEUE_NAME, true, consumer);}}代碼補(bǔ)充, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”); 中第三個(gè)參數(shù)置為空時(shí),可以接收到生產(chǎn)者所有的消息(生產(chǎn)者 routingKey 參數(shù)為空時(shí))
2.4 運(yùn)行截圖
先運(yùn)行兩個(gè)消費(fèi)者,再運(yùn)行生產(chǎn)者。如果沒有提前將隊(duì)列綁定到交換機(jī),那么直接運(yùn)行生產(chǎn)者的話,消息是不會(huì)發(fā)到任何隊(duì)列里的。
生產(chǎn)者
短信消費(fèi)者
郵件消費(fèi)者
3. 總結(jié)
首先相對(duì)于工作模式,發(fā)布訂閱模式引入了交換機(jī)的概念,相對(duì)其類型上更加靈活廣泛一些。通過上文我們可以總結(jié)如下:
1.生產(chǎn)者不是直接操作隊(duì)列,而是將數(shù)據(jù)發(fā)送給交換機(jī),由交換機(jī)將數(shù)據(jù)發(fā)送給與之綁定的隊(duì)列。從不加特定參數(shù)的運(yùn)行結(jié)果中可以看到,兩種類型的消費(fèi)者(email,sms)都收到相同數(shù)量的消息。
必須聲明交換機(jī),并且設(shè)置模式:channel.exchangeDeclare(EXCHANGE_NAME, “fanout”),其中 fanout 指分發(fā)模式(將每一條消息都發(fā)送到與交換機(jī)綁定的隊(duì)列)。
隊(duì)列必須綁定交換機(jī):channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);
生產(chǎn)者發(fā)送消息到交換機(jī),多個(gè)消費(fèi)者聲明多個(gè)隊(duì)列,與交換機(jī)進(jìn)行綁定,隊(duì)列中的消息可以被所有消費(fèi)者消費(fèi),類似于QQ群消息
案例代碼:https://www.lanzous.com/i5ydu6d
我創(chuàng)建了一個(gè)java相關(guān)的公眾號(hào),用來記錄自己的學(xué)習(xí)之路,感興趣的小伙伴可以關(guān)注一下微信公眾號(hào)哈:niceyoo
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ六种队列模式-发布订阅模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 编写TreeSet类的实现程序,其中相关
- 下一篇: 希赛软件设计师视频教程-3.1 进程(第