RabbitMQ(四):Exchange交换器--direct
內(nèi)容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環(huán)分發(fā)、消息確認、持久化、公平分發(fā)
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調(diào)隊列callback queue、關(guān)聯(lián)標識correlation id、實現(xiàn)簡單的RPC系統(tǒng)
RabbitMQ(七):常用方法說明 與 學(xué)習(xí)小結(jié)
Routing:
在上一篇博客中,我們創(chuàng)建了一個簡單的日志系統(tǒng)。我們可以將日志消息廣播給所有的接收者(消費者)。
在這個教程中,我們將為我們的日志系統(tǒng)添加一個功能:僅僅訂閱一部分消息。比如,我們可以直接將關(guān)鍵的錯誤類型日志消息保存到日志文件中,還可以同時將所有的日志消息打印到控制臺。
綁定(Bindings):
在之前的例子中,我們已經(jīng)創(chuàng)建了綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "");一個綁定是建立在一個隊列和一個路由器之間的關(guān)系,可以解讀為:該隊列對這個路由器中的消息感興趣。
綁定可以設(shè)置另外的參數(shù):路由鍵routingKey。為了避免和void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)中的routingKey混淆,我們將這里的key稱為綁定鍵binding key,下面的代碼展示了如何使用綁定鍵來創(chuàng)建一個綁定關(guān)系:
channel.queueBind(queueName, EXCHANGE_NAME, "black");綁定鍵的含義取決于路由器的類型,我們之前使用的fanout類型路由器會忽略該值。
直接路由器 (Direct Exchange):
我們之前的日志系統(tǒng)會將所有消息廣播給所有消費者。現(xiàn)在我們想根據(jù)日志的嚴重程度來過濾日志。比如,我們想要一個程序來將error日志寫到磁盤文件中,而不要將warning或info日志寫到磁盤中,以免浪費磁盤空間。
我們之前使用的fanout路由器缺少靈活性,它只是沒頭腦地廣播消息。所以,我們用direct路由器來替換它。direct路由器背后的路由算法很簡單:只有當消息的路由鍵routing key與隊列的綁定鍵binding key完全匹配時,該消息才會進入該隊列。
為了演示上面拗口的表述中的意思,考慮下面的設(shè)置:
上圖中,直接路由器X與兩個隊列綁定。第一個隊列以綁定鍵orange來綁定,第二個隊列以兩個綁定鍵black和green和路由器綁定。
按照這種設(shè)置,路由鍵為orange的消息在發(fā)布給路由器后,將會被路由到隊列Q1,路由鍵為black或者green的消息將會路由到隊列Q2。
多重綁定(Multiple bindings):
多個隊列以相同的綁定鍵binding key綁定到同一個Exchange上,是完全可以的。按照這種方式設(shè)置的話,直接路由器就會像fanout路由器一樣,將消息廣播給所有符合路由規(guī)則的隊列。一個路由鍵為black的消息將會發(fā)布到隊列Q1和Q2。
發(fā)布消息:
在這個教程中,我們使用direct路由器來代替上個教程中的fanout路由器。同時,我們?yōu)槿罩驹O(shè)置嚴重級別,并將此作為路由鍵。這樣,接收者(消費者)就可以選擇性地接收日志消息。
首先,創(chuàng)建一個路由器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");接著,發(fā)送一個消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());簡單起見,我們假設(shè)severity只能是info、warning 、error中的一種。
消息訂閱:
接收消息將會和之前的教程類似,只是我們會為每一個級別的消息來創(chuàng)建不同的綁定:
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity); }放在一塊:
生產(chǎn)者EmitLogDirect.java的完整代碼:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {//創(chuàng)建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和路由器的類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity = "info";String message = ".........i am msg.........";//發(fā)布消息channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");channel.close();connection.close();}}消費者ReceiveLogsDirect.java的完整代碼如下:
import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明隊列String queueName = channel.queueDeclare().getQueue();//定義要監(jiān)聽的級別String[] severities = {"info", "warning", "error"};//根據(jù)綁定鍵綁定for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }現(xiàn)在可以進行測試了。首先,啟動一個消費者實例(ReceiveLogsDirect.java),然后將其中的要監(jiān)聽的級別改為String[] severities = {"error"};,再啟動另一個消費者實例。此時,這兩個消費者都開始監(jiān)聽了,一個監(jiān)聽所有級別的日志消息,另一個監(jiān)聽error日志消息。
然后,啟動生產(chǎn)者(EmitLogDirect.java),之后將String severity = "info";中的info,分別改為warning、error后運行。
這樣,就可以在控制臺看到如下輸出:
?
說明:
①與原文略有出入,如有疑問,請參閱原文
②原文均是編譯后通過javacp命令直接運行程序,我是在IDE中進行的,相應(yīng)的操作做了修改。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ(四):Exchange交换器--direct的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ(三):Exchange
- 下一篇: RabbitMQ(五):Exchange