RabbitMq 发布订阅 Publish/Subscribe fanout/direct
目錄
?
概述
交換機
臨時隊列
代碼
概述
在上篇中了解到rabbitmq 生產者生產消息到隊列,多個消費者可以接受。這篇文章主要記錄廣播類型為fanout。生產者不在將產生的消息發送到隊列,而是將消息發送到交換機exchange,交換機會根據不同的交換規則,將消息發送到不同的隊列。交換器必須知道她所接收的消息是什么?它應該將消息放到哪個隊列中或者還是應該丟棄?這些規則都是按照交換機的規則來確定的。
? ? ? ? ? ?
交換機
Exchange(交換機):生產者會將消息發送到交換機,然后交換機通過路由策略(規則)將消息路由到匹配的隊列中去
交換規則:
Fanout 不處理路由。需要簡單的將隊列綁定到交換機上。一個發送到該類型交換機的消息都會被廣播到與該交換機綁定的所有隊列上。(本篇文章)
direct:它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。
channel.basicPublish(“direct”, “warn”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
我們定義direct交換機,綁定路由warn 這時候發送消息只能發送的綁定的隊列中 如隊列1 隊列2 但是如果綁定路由為info 則只有隊列2可以收到。
topic:direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似。
定義
//參數1 名稱 參數2 類型 channel.exchangeDeclare("fanout", "fanout");臨時隊列
在生產者和消費者之間創建一個新的隊列,這時候又不想使用原來的隊列,臨時隊列就是為這個場景而生的:
首先,每當我們連接到RabbitMQ,我們需要一個新的空隊列,我們可以用一個隨機名稱來創建,或者說讓服務器選擇一個隨機隊列名稱給我們,一旦我們斷開消費者,隊列應該立即被刪除。
在Java客戶端,提供queuedeclare()為我們創建一個非持久化、獨立、自動刪除的隊列名稱。
隊列綁定
BindOk com.rabbitmq.client.Channel.queueBind(String queue, String exchange, String routingKey) throws IOExceptionBind a queue to an exchange, with no extra arguments.Parameters:queue the name of the queueexchange the name of the exchangeroutingKey the routine key to use for the binding Returns:a binding-confirm method if the binding was successfully created Throws:java.io.IOException - if an error is encountered代碼
該代碼為fanout模式
生產者
package com.ll.mq.hellomq.queue;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;/*** * @author ll 生產者**/ public class Producer {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 分發消息for(int i = 0 ; i < 5; i++){String message = "Hello World! " + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" send'" + message + "'");}channel.close();connection.close();}}消費者1
package com.ll.mq.hellomq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /*** * @author ll 消費者1**/ public class ConsumerOne {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 獲取臨時隊列String queueName = channel.queueDeclare().getQueue();// 綁定channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" ConsumerOne '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }消費者2
package com.ll.mq.hellomq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;public class ConsumerTwo {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//獲取臨時隊列String queueName = channel.queueDeclare().getQueue();//綁定channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" ConsumerTwo '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }結果:
生產者:?send'Hello World! 0'
? ? ? ? ? ? ? ?send'Hello World! 1'
? ? ? ? ? ? ? ?send'Hello World! 2'
? ? ? ? ? ? ? ?send'Hello World! 3'
? ? ? ? ? ? ? ?send'Hello World! 4'
消費者1? ??ConsumerOne 'Hello World! 0'
? ? ? ? ? ? ? ?ConsumerOne 'Hello World! 1'
? ? ? ? ? ? ? ?ConsumerOne 'Hello World! 2'
? ? ? ? ? ? ? ConsumerOne 'Hello World! 3'
? ? ? ? ? ? ? ConsumerOne 'Hello World! 4'
消費者2? ?ConsumerTwo'Hello World! 0'
? ? ? ? ? ? ? ?ConsumerTwo'Hello World! 1'
? ? ? ? ? ? ? ?ConsumerTwo'Hello World! 2'
? ? ? ? ? ? ? ConsumerTwo'Hello World! 3'
? ? ? ? ? ? ? ConsumerTwo'Hello World! 4'
rabbitmq結果:
?
下一篇?https://blog.csdn.net/lilongwangyamin/article/details/105117288?rabbitmq topic
總結
以上是生活随笔為你收集整理的RabbitMq 发布订阅 Publish/Subscribe fanout/direct的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 万字长文带你一文读完Effective
- 下一篇: golang的GUI库,使用go-fyn