RabbitMq topic
目錄
概述
代碼
結果
概述
上篇direct必須是生產者發布消息指定的routingKey和消費者在隊列綁定時指定的routingKey完全相等時才能匹配到隊列上。topic與direct不同,可以進行模糊匹配。topic交換機的消息不能隨意的設置選擇routingKey,必須是由點隔開的一系列的標識符組成。標識符可以是任何東西,但是一般都與消息的某些特性相關。例子:“link.orange.rabbit”,你可以定義任何數量的標識符,上限為255個字節
匹配規則:
*??其中星號可以代替一個單詞
#? 可以替代零個或更多的單詞,只要能模糊匹配上就能將消息映射到隊列中。當一個隊列的綁定鍵為#的時候,這個隊列將會無視消息的路由鍵,接收所有的消息
注:隊列1 將接受warn相關的消息
? ? ? 隊列2 將接受info 相關消息 debugger所有消息
代碼
生產者
package com.ll.mq.hellomq.topic;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;/*** * @author ll 生產者**/ public class Producer {public static void main(String[] args) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// Routing 的路由規則使用直連接String EXCHANGE_NAME = "exchange.topic";String[] routingKeys = { "link.warn.rabbit", "link.warn.elephant", "debugger.info"};for (String routingKey : routingKeys) {String message = "Hello RabbitMQ - " + routingKey;System.out.println(message);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));}// 關閉資源channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}} }消費者1
package com.ll.mq.hellomq.topic;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /*** * @author ll 消費者1**/ public class ConsumerOne {static String EXCHANGE_NAME = "exchange.topic";static String QUEUE_NAME = "queue.topic.q1";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明交換機類型channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.exchangeDeclare(EXCHANGE_NAME, "topic");String[] routingKeys = {"*.warn.*"};for (int i = 0; i < routingKeys.length; i++) {channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);}Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [C] ConsumerOne '" + message + "', 接受...");}};channel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
package com.ll.mq.hellomq.topic;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /*** * @author ll 消費者2**/ public class ConsumerTwo {static String EXCHANGE_NAME = "exchange.topic";static String QUEUE_NAME = "queue.topic.q2";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明交換機類型channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.exchangeDeclare(EXCHANGE_NAME, "topic");String[] routingKeys = {"debugger.#"};for (int i = 0; i < routingKeys.length; i++) {channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);}Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [C] ConsumerTwo '" + message + "', 接受中...");}};channel.basicConsume(QUEUE_NAME, true, consumer);} }?
結果
生產者:Hello RabbitMQ - link.warn.rabbit
? ? ? ? ? ? ? Hello RabbitMQ - link.warn.elephant
? ? ? ? ? ? ? Hello RabbitMQ - debugger.info
消費者1??[C] ConsumerOne 'Hello RabbitMQ - link.warn.rabbit', 接受...
? ? ? ? ? ? ? ?[C] ConsumerOne 'Hello RabbitMQ - link.warn.elephant', 接受...
消費者2??[C] ConsumerTwo 'Hello RabbitMQ - debugger.info', 接受中...
下一篇?https://blog.csdn.net/lilongwangyamin/article/details/105118955?rabbitmq rpc遠程過程調用?
總結
以上是生活随笔為你收集整理的RabbitMq topic的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 移动应用开发——作业3
- 下一篇: 程序员的十个糟糕的行为