RabbitMQ指南(中)
在上一篇文章中,介紹了使用RabbitMQ的Hello World例子, 以及如何創建一個work queue。在work queue的例子中每條消息都只會被傳遞到一個work queue中。 在這篇文章中我們將會學習另一種完全不同的傳遞消息的方式——每條消息將會被傳遞給所有的consumer,這種模式一般被稱為“發布/訂閱”。
發布/訂閱(Publish/Subscribe)
為了說明這種模式,我們將創建一個簡單的log系統,它由兩部分組成——第一部分負責發送log消息,第二部分負責接收并且將消息打印出來。 在我們的log系統中每個運行著的接收程序都會接收到消息,在這種方式下我們可以有一個consumer負責將log持久化到磁盤, 同時由另一個consumer來將log打印到控制臺。本質上,發送log消息是對所有消息接收者的廣播。
Exchange
在之前的部分我們都是通過queue來發送和接收消息,現在是時候來介紹RabbitMQ完整的消息模型了。先讓我們來快速地回顧一下之前介紹過的幾個概念:
- producer是用戶應用負責發送消息
- queue是存儲消息的緩沖(buffer)
- consumer是用戶應用負責接收消息
RabbitMQ的消息模型的核心思想是producer永遠不會直接發送任何消息到queue中,實際上,在很多情況下producer根本不知道一條消息是否被發送到了哪個queue中。
在RabbitMQ中,producer只能將消息發送到一個exchange中。要理解exchange也非常簡單,它一邊負責接收producer發送的消息, 另一邊將消息推送到queue中。exchange必須清楚的知道在收到消息之后該如何進行下一步的處理,比如是否應該將這條消息發送到某個queue中? 還是應該發送到多個queue中?還是應該直接丟棄這條消息等等。用官方文檔上的一張圖可以更清楚地了解RabbitMQ的消息模型。
RabbitMQ中的exchange類型有這么幾種:direct,topic,headers以及fanout。這一小節將會主要介紹最后一種類型——fanout。 使用RabbitMQ的client來創建一個fanout類型的exchange,命令為logs:
| 1 | channel.exchangeDeclare("logs","fanout"); |
fanout類型的exchange非常簡單,從名字也可以猜測出來,它會向所有的queue廣播所有收到的消息。這正是我們的log系統需要的。
在之前的部分我們對exchange一無所知,但是我們仍然可以將消息發送到queue中,這是因為我們使用了默認的exchange,在代碼中使用空字符串(“”)表示。
| 1 | channel.basicPublish("", "hello", null, message.getBytes()); |
第一個參數表示exchange的名字,使用空字符串表示使用默認的無名的exchange:如果有的話,消息將根據routingKey被發送到指定的queue中。
現在,可以將消息發送到之前已經聲明過的exchange中
| 1 | channel.basicPublish( "logs", "", null, message.getBytes()); |
臨時隊列
在之前的小節中使用queue都是指定了名字的(hello和task_queue),給queue命名是非常重要的,因為我們需要將的workers指定到相同的queue上, 并且在consumer與producer之間也需要指定相同的queue。
但是這對我們的log系統來說不是必須的,我們需要監聽所有的log消息,而不是其中的一部分。我們也只關心現在的消息而不關注以前的消息, 為了解決這個問題我們需要做兩件事情。
首先,無論何時連接到RabbitMQ server上都需要一個新的、空的queue。為了做到這一點需要能夠使用一個隨機的名字來創建queue, 更好的方式是由server來為我們選擇一個隨機的名字。
其次,一旦我們與consumer斷開連接,queue應該被自動刪除。
在Java client中,提供了一個無參數的queueDeclare()方來來創建一個非持久化的、獨有的并且是自動刪除的已命名的queue。
| 1 | String queueName = channel.queueDeclare().getQueue(); |
queueName會包含一個隨機的queue名字,可能看起來類似amq.gen-JzTY20BRgKO-HjmUJj0wLg。
綁定
我們已經創建了一個fanout類型的exchange和一個queue。現在我們需要告訴exchange將消息發送到我們的queue中。 這種exchange和queue的關系稱為綁定(binding)。
| 1 | channel.queueBind(queueName, "logs", ""); |
之后logs exchange將會把消息發送到我們的queue中。
完整的EmitLog.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class EmitLog { ????private static final String EXCHANGE_NAME = "logs"; ????public static void main(String[] argv) ??????????????????throws java.io.IOException { ????????ConnectionFactory factory = new ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection connection = factory.newConnection(); ????????Channel channel = connection.createChannel(); ????????channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); ????????String message = getMessage(argv); ????????channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); ????????System.out.println(" [x] Sent '" + message + "'"); ????????channel.close(); ????????connection.close(); ????} ????//... } |
可以看到,在創建連接之后聲明exchange。這一步是必要的,因為將消息發送到一個不存在的exchange是被禁止的。
如果還沒有queue被綁定到exchange上,那么消息將會丟失,但這對我們來說是可以接收的,如果沒有consumer正在監聽消息, 那么可以安全的丟棄這些消息。
完整的ReceiveLogs.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public class ReceiveLogs { ??private static final String EXCHANGE_NAME = "logs"; ??public static void main(String[] argv) throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????Connection connection = factory.newConnection(); ????Channel channel = connection.createChannel(); ????channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); ????String queueName = channel.queueDeclare().getQueue(); ????channel.queueBind(queueName, EXCHANGE_NAME, ""); ????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ????Consumer consumer = new DefaultConsumer(channel) { ??????@Override ??????public void handleDelivery(String consumerTag, Envelope envelope, ?????????????????????????????????AMQP.BasicProperties properties, byte[] body) throws IOException { ????????String message = new String(body, "UTF-8"); ????????System.out.println(" [x] Received '" + message + "'"); ??????} ????}; ????channel.basicConsume(queueName, true, consumer); ??} } |
Routing
在上一小節中我們構建了一個簡單的log系統,可以向許多接收者廣播消息。在這一小節中我們將會對此增加一個特性——可以只訂閱消息的一部分。 舉例來說,可以只將critical級別的錯誤日志持久化到磁盤,同時又能夠將所有的消息打印到控制臺。
綁定(binds)
在前一小節中已經介紹了如何創建綁定
| 1 | channel.queueBind(queueName, EXCHANGE_NAME, ""); |
綁定是exchange和queue之間的一種關系,這可以簡單的理解為:這個queue對這個exchange中的消息感興趣。
綁定可以使用一個額外的routingKey參數,為了避免和basic_publish參數混淆,我們稱它為binding key。 我們可以這樣來使用key創建一個綁定:
| 1 | channel.queueBind(queueName, EXCHANGE_NAME, "black"); |
binding key的含義取決于不同的exchange類型,我們之前使用的fanout類型會直接忽略這個值。
Direct exchange
我們之前的log消息系統將所有的消息廣播到所有的consumer中。我們需要對此進行擴展,允許根據log的級別進行消息的過濾。 之前使用的fanout類型的exchange,沒有提供給我們類似的靈活性——它只能簡單的廣播所有的消息。
在這里將會使用direct類型的exchange作為代替。direct類型的exchange的路由算法很簡單——消息將會被傳遞到與它的routing key完全相同的?binding key的queue中。
還是使用一張圖來說明:
在圖中可以看到,有兩個queue被綁定到了direct類型的exchange X上。第一個queue使用bing key?orange綁定,第二個queue使用了兩個binding key, 分別為black和green。
在這樣的情況下,使用routing key為orange發送的消息將會被路由到queue?Q1中,使用routing key為black或者green的將會被路由到Q2中。 所有其他的消息將會被丟棄。
多重綁定(Multiple bindings)
將多個queue使用相同的binding key進行綁定也是可行的。在我們的例子中可以在X和Q1中間增加一個binding key?black。 在這種情況下,direct類型的exchange的行為將和fanout類似,它會向所有匹配的queue進行廣播,使用routing key為black發送的消息將會同時被Q1和Q2接收。
發送log
我們將會為log系統使用這種模型。使用direct類型的exchange代替fanout。我們將會通過routing key提供log的嚴重級別。 使用這種方式可以選擇不同的log嚴重級別來接收消息。首先來看發送log的部分。
創建一個exchange:
| 1 | channel.exchangeDeclare(EXCHANGE_NAME, "direct"); |
已經準備好發送消息:
| 1 | channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); |
為了簡單起見,我們假設日志的級別只會為’info,’warning’,’error’三者中的一個。
訂閱
接受消息部分將會和上一小節相同,除了一個例外——我們將會為每個感興趣的嚴重級別創建新的綁定。
| 1 2 3 4 5 | String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ ??channel.queueBind(queueName, EXCHANGE_NAME, severity); } |
完整的EmitLogDirect.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public class EmitLogDirect { ????private static final String EXCHANGE_NAME = "direct_logs"; ????public static void main(String[] argv) ??????????????????throws java.io.IOException { ????????ConnectionFactory factory = new ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection connection = factory.newConnection(); ????????Channel channel = connection.createChannel(); ????????channel.exchangeDeclare(EXCHANGE_NAME, "direct"); ????????String severity = getSeverity(argv); ????????String message = getMessage(argv); ????????channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); ????????System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); ????????channel.close(); ????????connection.close(); ????} ????//.. } |
完整的ReceiveLogsDirect.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | public class ReceiveLogsDirect { ??private static final String EXCHANGE_NAME = "direct_logs"; ??public static void main(String[] argv) throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????Connection connection = factory.newConnection(); ????Channel channel = connection.createChannel(); ????channel.exchangeDeclare(EXCHANGE_NAME, "direct"); ????String queueName = channel.queueDeclare().getQueue(); ????if (argv.length < 1){ ??????System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); ??????System.exit(1); ????} ????for(String severity : argv){ ??????channel.queueBind(queueName, EXCHANGE_NAME, severity); ????} ????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ????Consumer consumer = new DefaultConsumer(channel) { ??????@Override ??????public void handleDelivery(String consumerTag, Envelope envelope, ?????????????????????????????????AMQP.BasicProperties properties, byte[] body) throws IOException { ????????String message = new String(body, "UTF-8"); ????????System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); ??????} ????}; ????channel.basicConsume(queueName, true, consumer); ??} } |
可以在命令行中傳入感興趣的日志的嚴重級別來綁定。
from: http://www.importnew.com/24324.html
總結
以上是生活随笔為你收集整理的RabbitMQ指南(中)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ指南(上)
- 下一篇: RabbitMQ指南(下)