RabbitMQ(三):Exchange交换器--fanout
內容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環分發、消息確認、持久化、公平分發
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調隊列callback queue、關聯標識correlation id、實現簡單的RPC系統
RabbitMQ(七):常用方法說明 與 學習小結
Publish/Subscribe:
在上一篇博客中,我們創建了一個工作隊列:一個消息只能發送到一個工作者(消費者)中。而在這個教程中我們將會做完全不同的事情:我們發送同一個消息到多個消費者中。這種模式一般被稱為“發布/訂閱”模式。
為了演示這種模式,我們將會創建一個簡單的日志系統。它由兩個程序組成:第一個將會輸出日志消息,第二個將會接受并打印出日志消息。
在這個日志系統中,每一個接收程序(消費者)都會收到所有的消息,其中一個消費者將消息直接保存到磁盤中,而另一個消費者則將日志輸出到控制臺。
從本質上講,發布的日志消息將會廣播給所有的接收者(消費者)。
交換器Exchanges:
在之前的教程里,我們都是直接往隊列里發送消息,然后又直接從隊列里取出消息?,F在是時候介紹RabbitMQ的整個消息模型了。
先讓我們快速地回顧一下之前教程中的幾個概念:
(1)生產者:發送消息的用戶程序
(2)隊列:存儲消息的緩沖區
(3)消費者:接收消息的用戶程序
RabbitMQ的消息模型中的一個核心思想是,生產者絕不會將消息直接發送到隊列中,實際上,在大部分場景中生產者根本不知道消息會發送到哪些隊列中。
相反,生產者只會將消息發送給一個Exchange(路由器/交換器)。Exchange其實很簡單,它所做的就是,接收生產者發來的消息,并將這些消息推送到隊列中。Exchange必須清楚地知道怎么處理接收到的消息:是將消息放到一個特定的隊列中,還是放到多個隊列中,還是直接將消息丟棄。下圖示意了Exchange在消息模型中的位置:
?
Exchange一共有四種類型:direct、topic、headers 和fanout。今天的教程將會使用fanout類型的Exchange,讓我們創建一個名為logs的fanout類型的Exchange
channel.exchangeDeclare("logs", "fanout");fanout類型的Exchange非常簡單,從它的名字你可能就已經猜出來了(fanout翻譯過來是扇形的意思),它將會將接收到的消息廣播給所有它知道的隊列。這正是我們的日志系統所需要的類型。
可以通過下面的命令列出Rabbit服務器上的所有Exchange
sudo rabbitmqctl list_exchanges沒有命名的Exchange:
在前面的教程中,我們對Exchange一無所知,但是我們仍然可以將消息發送到隊列中,這可能是因為我們使用了默認的Exchange,我們是通過空字符串""來定義這個Exchange的。回想一下我們之前是怎么發布消息的:
channel.basicPublish("", "hello", null, message.getBytes()); //該方法的定義為: basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)上面代碼的方法中,第一個參數就是Exchange的名字,空字符串表示默認或無名Exchange:消息通過由routingKey定義的隊列被路由的。
現在,我們通過下面的方式來發布消息:
channel.basicPublish( "logs", "", null, message.getBytes());臨時隊列:
你可能記得之前我們使用了特定名字的隊列(還記得hello和task_queue嗎)??梢灾该饕粋€隊列這一點對我們而言至關重要,因為我們也要讓工作者指向同一個隊列。當你在生產者和消費者之間共用一個隊列時,給這個隊列取個名字就非常重要。
但這不適應于我們的日志系統。我們想讓每個消費者都接收到所有的日志消息,而不是其中的一部分日志消息。我們關心的是當前廣播的消息。為了解決這些問題,我們需要做兩件事情。
首先,無論何時我們連接到RabbitMQ服務的時候,我們都需要一個新鮮的空的隊列。為了達到這個效果,我們可以為隊列取一個隨機的名字,或者更好的是,讓RabbitMQ服務器為我們的隊列隨機起個名字。
其次,當我們關閉了消費者的時候,隊列應該自動刪除。
當我們調用無參的queueDeclare()的時候,意味著創建了一個非持久、獨享的、自動刪除的隊列,并返回一個自動生成的名字:
String queueName = channel.queueDeclare().getQueue();這樣就可以獲取隨機的隊列名字了,這個名字看起來形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
綁定:
我們已經創建了一個fanout類型的Exchange和一個隊列?,F在我們需要告訴Exchange發送消息到我們的隊列中。Exchange和隊列之間的關系稱為綁定。
channel.queueBind(queueName, "logs", "");?
這樣,我們創建的隊列就和我們創建的logs路由器建立了關系,路由器就會將消息發送到這個隊列中。
可以通過下面的命令查看所有已經存在的綁定關系:
rabbitmqctl list_bindings整合到一起:
對生產者程序,它輸出日志消息,與之前的教程并沒與很大不同。最重要的改變就是,我們將消息發布給logs路由器,而不是無名的路由的。當發消息的時候,我們需要提供一個路由鍵routingKey,但是它的值會被fanout類型的路由器忽略,以下是生產者EmitLog.java的完整代碼:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由以及路由的類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "msg...";//發布消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");//關閉連接和通道channel.close();connection.close();} }可以看到,在建立了連接之后,我們聲明了路由器Exchange。這一步是必須的,因為不允許將消息發給一個不存在的路由器。
如果路由器還沒有綁定隊列,這些發送給路由器的消息將會丟失。但這對我們無所謂,如果還沒有消費者監聽,我們可以安全地丟棄這些消息。
消費者ReceiveLogs.java的完整代碼如下:
import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器及類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//聲明一個隨機名字的隊列String queueName = channel.queueDeclare().getQueue();//綁定隊列到路由器上channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//開始監聽消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }現在,可以運行程序并查看結果了。首先運行兩個消費者實例ReceiveLogs.java,然后運行生產者EmitLog.java??纯磧蓚€消費者實例是不是都接收到了所有的消息。
//生產者[x] Sent 'msg...' //消費者1[*] Waiting for messages. To exit press CTRL+C[x] Received 'msg...' //消費者2[*] Waiting for messages. To exit press CTRL+C[x] Received 'msg...'可以看到,當生產者發出消息后,兩個消費者最終都收到了消息。(本例子中只是都將日志消息都輸出到控制臺,如果想讓其中一個消費者把日志消息輸出到文件中,請參考原文)。
為了驗證我們的代碼真正地將隊列和路由器綁定到了一起,可以使用rabbitmqctl list_bindings命令查看綁定關系,假定我們運行了兩個消費者,那么你應該可以看到如下的類似信息:
Listing bindings ... ... logs exchange amq.gen-S2B2k3mSnoLNNyUppS98Vw queue [] logs exchange amq.gen-ljp5AngXol4iPW649OY7Pw queue []從上面的結果可以看到,數據從logs路由器傳輸到兩個隨機名字的隊列中,這正是我們想要的。
想要了解如何監聽一部分消息,請看下一篇博客。
?
說明:
①與原文略有出入,如有疑問,請參閱原文
②原文均是編譯后通過javacp命令直接運行程序,我是在IDE中進行的,相應的操作做了修改。
總結
以上是生活随笔為你收集整理的RabbitMQ(三):Exchange交换器--fanout的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ(二):Work Que
- 下一篇: RabbitMQ(四):Exchange