【转】RabbitMQ六种队列模式-3.发布订阅模式
前言
RabbitMQ六種隊(duì)列模式-簡(jiǎn)單隊(duì)列
RabbitMQ六種隊(duì)列模式-工作隊(duì)列
RabbitMQ六種隊(duì)列模式-發(fā)布訂閱?[本文]
RabbitMQ六種隊(duì)列模式-路由模式
RabbitMQ六種隊(duì)列模式-主題模式
上文的工作隊(duì)列模式是直接在生產(chǎn)者與消費(fèi)者里聲明好一個(gè)隊(duì)列,這種情況下消息只會(huì)對(duì)應(yīng)同類型的消費(fèi)者。
顯然這種只處理同種類型的消息是有弊端的。
舉個(gè)用戶注冊(cè)的列子
門戶網(wǎng)站,用戶在注冊(cè)完后一般都會(huì)發(fā)送消息通知用戶注冊(cè)成功(失敗)。
如果在一個(gè)系統(tǒng)中,用戶注冊(cè)信息有郵箱、手機(jī)號(hào),那么在注冊(cè)完后會(huì)向郵箱和手機(jī)號(hào)都發(fā)送注冊(cè)完成信息(假設(shè)都發(fā)送)。
利用 MQ 實(shí)現(xiàn)業(yè)務(wù)異步處理,如果是用工作隊(duì)列的話,就會(huì)聲明一個(gè)注冊(cè)信息隊(duì)列。注冊(cè)完成之后生產(chǎn)者會(huì)向隊(duì)列提交一條注冊(cè)數(shù)據(jù),消費(fèi)者取出數(shù)據(jù)同時(shí)向郵箱以及手機(jī)號(hào)發(fā)送兩條消息。但是實(shí)際上郵箱和手機(jī)號(hào)信息發(fā)送實(shí)際上是不同的業(yè)務(wù)邏輯,不應(yīng)該放在一塊處理。
這個(gè)時(shí)候就可以利用發(fā)布/訂閱模式將消息發(fā)送到轉(zhuǎn)換機(jī)(EXCHANGE),聲明兩個(gè)不同的隊(duì)列(郵箱、手機(jī)),并綁定到交換機(jī)。這樣生產(chǎn)者只需要發(fā)布一次消息,兩個(gè)隊(duì)列都會(huì)接收到消息發(fā)給對(duì)應(yīng)的消費(fèi)者,大致如下圖所示。
在應(yīng)用中,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。
文章目錄
1. 什么是發(fā)布訂閱模式2. 代碼部分2.1 生產(chǎn)者2.2 郵件消費(fèi)者2.3 短信消費(fèi)者2.4 運(yùn)行截圖3. 總結(jié)
1. 什么是發(fā)布訂閱模式
簡(jiǎn)單解釋就是,可以將消息發(fā)送給不同類型的消費(fèi)者。做到發(fā)布一次,消費(fèi)多個(gè)。下圖取自于官方網(wǎng)站(RabbitMQ)的發(fā)布/訂閱模式的圖例:
P 表示為生產(chǎn)者、 X 表示交換機(jī)、C1C2 表示為消費(fèi)者,紅色表示隊(duì)列。
下面代碼部分會(huì)展示郵件、短信的例子,通過綁定到一個(gè)交換機(jī),但是
2. 代碼部分
2.1 生產(chǎn)者
public?class?ProducerFanout?{private?static?final?String?EXCHANGE_NAME?=?"fanout_exchange";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{/**?1.創(chuàng)建新的連接?*/Connection?connection?=?MQConnectionUtils.newConnection();/**?2.創(chuàng)建通道?*/Channel?channel?=?connection.createChannel();/**?3.綁定的交換機(jī)?參數(shù)1交互機(jī)名稱?參數(shù)2?exchange類型?*/channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");/**?4.發(fā)送消息?*/for?(int?i?=?0;?i?<?10;?i++){String?message?=?"用戶注冊(cè)消息:"?+?i;System.out.println("[send]:"?+?message);//發(fā)送消息channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes("utf-8"));try?{Thread.sleep(5?*?i);}?catch?(InterruptedException?e)?{e.printStackTrace();}}/**?5.關(guān)閉通道、連接?*/channel.close();connection.close();/**?注意:如果消費(fèi)沒有綁定交換機(jī)和隊(duì)列,則消息會(huì)丟失?*/}}代碼補(bǔ)充,channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));?其中第二個(gè)參數(shù)為空類似于表示全局廣播,只要綁定到該隊(duì)列上的消費(fèi)者理論上是都可以收到的。
2.2 郵件消費(fèi)者
public?class?ConsumerEmailFanout?{private?static?final?String?QUEUE_NAME?=?"consumerFanout_email";private?static?final?String?EXCHANGE_NAME?=?"fanout_exchange";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{System.out.println("郵件消費(fèi)者啟動(dòng)");/*?1.創(chuàng)建新的連接?*/Connection?connection?=?MQConnectionUtils.newConnection();/*?2.創(chuàng)建通道?*/Channel?channel?=?connection.createChannel();/*?3.消費(fèi)者關(guān)聯(lián)隊(duì)列?*/channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);/*?4.消費(fèi)者綁定交換機(jī)?參數(shù)1?隊(duì)列?參數(shù)2交換機(jī)?參數(shù)3?routingKey?*/channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"");DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{@Overridepublic?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)throws?IOException?{String?msg?=?new?String(body,?"UTF-8");System.out.println("消費(fèi)者獲取生產(chǎn)者消息:"?+?msg);}};/*?5.消費(fèi)者監(jiān)聽隊(duì)列消息?*/channel.basicConsume(QUEUE_NAME,?true,?consumer);}}代碼補(bǔ)充,?channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");?中第三個(gè)參數(shù)置為空時(shí),可以接收到生產(chǎn)者所有的消息(生產(chǎn)者 routingKey 參數(shù)為空時(shí))。
2.3 短信消費(fèi)者
public?class?ConsumerSMSFanout?{private?static?final?String?QUEUE_NAME?=?"ConsumerFanout_sms";private?static?final?String?EXCHANGE_NAME?=?"fanout_exchange";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{System.out.println("短信消費(fèi)者啟動(dòng)");/*?1.創(chuàng)建新的連接?*/Connection?connection?=?MQConnectionUtils.newConnection();/*?2.創(chuàng)建通道?*/Channel?channel?=?connection.createChannel();/*?3.消費(fèi)者關(guān)聯(lián)隊(duì)列?*/channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);/*?4.消費(fèi)者綁定交換機(jī)?參數(shù)1?隊(duì)列?參數(shù)2交換機(jī)?參數(shù)3?routingKey?*/channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"");DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{@Overridepublic?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)throws?IOException?{String?msg?=?new?String(body,?"UTF-8");System.out.println("消費(fèi)者獲取生產(chǎn)者消息:"?+?msg);}};/*?5.消費(fèi)者監(jiān)聽隊(duì)列消息?*/channel.basicConsume(QUEUE_NAME,?true,?consumer);}}代碼補(bǔ)充,?channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");?中第三個(gè)參數(shù)置為空時(shí),可以接收到生產(chǎn)者所有的消息(生產(chǎn)者 routingKey 參數(shù)為空時(shí))
2.4 運(yùn)行截圖
先運(yùn)行兩個(gè)消費(fèi)者,再運(yùn)行生產(chǎn)者。如果沒有提前將隊(duì)列綁定到交換機(jī),那么直接運(yùn)行生產(chǎn)者的話,消息是不會(huì)發(fā)到任何隊(duì)列里的。
生產(chǎn)者
短信消費(fèi)者
郵件消費(fèi)者
3. 總結(jié)
首先相對(duì)于工作模式,發(fā)布訂閱模式引入了交換機(jī)的概念,相對(duì)其類型上更加靈活廣泛一些。通過上文我們可以總結(jié)如下:
1.生產(chǎn)者不是直接操作隊(duì)列,而是將數(shù)據(jù)發(fā)送給交換機(jī),由交換機(jī)將數(shù)據(jù)發(fā)送給與之綁定的隊(duì)列。從不加特定參數(shù)的運(yùn)行結(jié)果中可以看到,兩種類型的消費(fèi)者(email,sms)都收到相同數(shù)量的消息。
必須聲明交換機(jī),并且設(shè)置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),其中 fanout 指分發(fā)模式(將每一條消息都發(fā)送到與交換機(jī)綁定的隊(duì)列)。
隊(duì)列必須綁定交換機(jī):channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
生產(chǎn)者發(fā)送消息到交換機(jī),多個(gè)消費(fèi)者聲明多個(gè)隊(duì)列,與交換機(jī)進(jìn)行綁定,隊(duì)列中的消息可以被所有消費(fèi)者消費(fèi),類似于QQ群消息
案例代碼:https://www.lanzous.com/i5ydu6d
總結(jié)
以上是生活随笔為你收集整理的【转】RabbitMQ六种队列模式-3.发布订阅模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【转】刨根究底字符编码之九——字符编码方
- 下一篇: 【转】C#中[STAThread]的作用