5 交换机-direct (路由)
目錄
- 路由
- 1、綁定
- 2、direct 類型exchange
- 3、多重綁定
- 4、示例代碼
- 4.1、生產者
- 4.2、消費者
路由
上文中的exchange類型是fanout的,該類型的exchange會把消息發送給所有綁定到該exchange的queue
現在我們需要添加一個特性,只訂閱(subscribe)一部分指定的消息(Message)
1、綁定
在前面的例子中,我們已經在創建綁定。您可能會記得以下代碼:
// 綁定消息路由和消息隊列 // 第一個參數:隊列名稱 // 第二個參數:交換器名稱 channel.queueBind(queueName, EXCHANGE_NAME, "");綁定(bindings)是交換(exchange)和隊列(queue)之間的關系。這可以簡單地理解為:隊列對來自此交換的消息感興趣;
Binding可以使用一個已經存在的routingKey參數。為了避免和basic_publish參數混淆,我們稱之為binding key。下邊就是我們怎么用key來創建一個binding;
// 綁定消息路由和消息隊列 // 第一個參數:隊列名稱 // 第二個參數:交換器名稱 // 第三個參數:路由鍵 channel.queueBind(queueName, EXCHANGE_NAME, "");binding key的意義有時候取決于exchange的類型。對于Fanout類型的exchange,會忽略binding key
2、direct 類型exchange
上文中的日志系統會把所有的log消息廣播給所有的消費者。我們想擴展來根據他們的日志級別來過濾log消息。例如:我們只想把error級別的日志寫到磁盤文件中,而其它級別的日志消息則過濾掉
我們之前使用的fanout類型的exchange,但這樣就不會有太多的靈活性
在這里我們將要使用direct類型的exchange。Direct類型exchange的路由算法是很簡單的:要想一個消息能到達這個隊列,需要binding key和routing key正好能匹配得上。
為了說明這個道理,可以看看下邊的描述:
在這樣的結構中,我們可以看到direct類型的exchange X,有兩個queue綁定到它。第一個queue是以orange為binding key綁定到exchange X上的,第二個queue是由兩個binding key(black和green)綁定到exchange X的。
在這樣的設置中,一條消息被推送到exchange,如果使用的routing key是orange,那么消息就會被路由到Q1中;如果使用的routing key是black或green,那么該消息將會被路由到Q2中。其它的消息都將會被丟棄掉
3、多重綁定
用同一個binding來把多個queue綁定到同一個exchange也是可行的。例如在之前例子的基礎上,在X和Q1之間添加binding key名字為black,這樣的話,這里的direct類型的exchange就和fanout類型的一樣了,可以把消息推送給所有的queue。帶有routing key為black的消息將會被推送到Q1和Q2中
4、示例代碼
4.1、生產者
package com.demo.java.消息隊列.路由;import com.demo.utils.MqConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author zhaodi* @description 路由* @date 2018/9/29 10:51*/ public class Producer {private static final String EXCHANGE_NAME = "my-exchange-2";private static final String ROUTE_KEY = "error";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 申明交換器channel.exchangeDeclare("EXCHANGE_NAME", "direct");for (int i = 0; i < 30; i++) {String msg = "P--->INFO:" + i;channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, null, new String(msg).getBytes());System.out.println(msg);Thread.sleep(500);}channel.close();connection.close();} }4.2、消費者
package com.demo.java.消息隊列.路由;import com.demo.utils.MqConnectionUtil; import com.rabbitmq.client.*;import java.io.IOException;/*** @author zhaodi* @description 路由* @date 2018/9/29 10:52*/ public class ConsumerInfo {private static final String EXCHANGE_NAME = "my-exchange-2";private static final String ROUTE_KEY = "info";public static void main(String[] args) throws IOException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();// exchange和queue綁定// 第一個參數:隊列名字;第二個參數:交換機名稱;第三個參數:路由鍵channel.queueBind(queueName, EXCHANGE_NAME, ROUTE_KEY);com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body);System.out.println("c1--->:" + msg);channel.basicAck(envelope.getDeliveryTag(), false);}};boolean autoAck = false;channel.basicConsume(queueName, autoAck, consumer);} }轉載于:https://www.cnblogs.com/zhaod/p/11391476.html
總結
以上是生活随笔為你收集整理的5 交换机-direct (路由)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 4 交换机-fanout(订阅发布模式)
- 下一篇: 6 交换机-topic类型