rabbitmq-路由模式-routingkey
生活随笔
收集整理的這篇文章主要介紹了
rabbitmq-路由模式-routingkey
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
【README】
本文po出 rabbitmq路由模式;
?
【1】intro to 路由模式
特點(diǎn)1)隊(duì)列與交換機(jī)的綁定,不能是任意綁定, 而是指定一個(gè)路由key-routingkey;
特點(diǎn)2)消息的發(fā)送方向在向 exchange-交換機(jī)發(fā)送消息時(shí),也必須指定消息的routingkey;
特點(diǎn)3)exchange-交換機(jī)不再把消息發(fā)送給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的routingkey發(fā)送到對應(yīng)的隊(duì)列;
與發(fā)布訂閱模式不同,路由模式的交換機(jī)類型是 Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routingkey;?
【2】代碼
生產(chǎn)者
/*** 路由模式生產(chǎn)者*/ public class RouteProducer {/* 交換機(jī)名稱 */static final String DIRECT_EXCHANGE = "direct_exchange"; /*隊(duì)列名稱1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*隊(duì)列名稱2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建頻道 Channel channel = conn.createChannel();/*** 聲明交換機(jī)* 參數(shù)1-交換機(jī)名稱 * 參數(shù)2-交換機(jī)類型(fanout, topic, direct, headers)*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); /*** 聲明隊(duì)列* 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否定義持久化隊(duì)列 * 參數(shù)3 是否獨(dú)占本次連接 * 參數(shù)4 是否在不使用的時(shí)候自動刪除隊(duì)列* 參數(shù)5 隊(duì)列其他參數(shù) */channel.queueDeclare(ROUTE_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(ROUTE_QUEUE_UPDATE, true, false, false, null);/*** routingkey-路由鍵*/String insertRoutingKey = "insert";String updateRoutingKey = "update";/*** 隊(duì)列綁定交換機(jī)* 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 交換機(jī) * 參數(shù)3 routingkey-路由鍵 */channel.queueBind(ROUTE_QUEUE_INSERT, DIRECT_EXCHANGE, insertRoutingKey);channel.queueBind(ROUTE_QUEUE_UPDATE, DIRECT_EXCHANGE, updateRoutingKey);/* 發(fā)送消息-insert */ String insertMsg = "我是消息,路由模式routingkey=" + insertRoutingKey + MyDateUtil.getNow();/*** 參數(shù)1 交換機(jī)名稱 如果沒有指定則使用默認(rèn) default exchange * 參數(shù)2 routingkey-路由key, 簡單模式可以傳遞隊(duì)列名稱 * 參數(shù)3 消息其他屬性* 參數(shù)4 消息內(nèi)容 */channel.basicPublish(DIRECT_EXCHANGE, insertRoutingKey, null, insertMsg.getBytes());System.out.println("已發(fā)送消息=" + insertMsg); /* 發(fā)送消息-update */String updateMsg = "我是消息,路由模式routingkey=" + updateRoutingKey + MyDateUtil.getNow();channel.basicPublish(DIRECT_EXCHANGE, updateRoutingKey, null, updateMsg.getBytes());System.out.println("已發(fā)送消息=" + updateMsg); /* 關(guān)閉連接和信道 */ channel.close();conn.close(); } }消費(fèi)者-insert
/*** 路由模式消費(fèi)者-routingkey */ public class RouteConsumerInsert {/* 交換機(jī)名稱 */static final String DIRECT_EXCHANGE = "direct_exchange"; /*隊(duì)列名稱1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*隊(duì)列名稱2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*創(chuàng)建連接 */Connection conn = RBConnectionUtil.getConn();/*創(chuàng)建隊(duì)列*/Channel channel = conn.createChannel(); /*聲明交換機(jī)*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** routingkey-路由鍵*/String insertRoutingKey = "insert";String updateRoutingKey = "update";/*** 聲明/創(chuàng)建隊(duì)列 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否持久化* 參數(shù)3 是否獨(dú)占本連接 * 參數(shù)4 是否在不使用的時(shí)候自動刪除隊(duì)列* 參數(shù)5 隊(duì)列其他參數(shù) */channel.queueDeclare(ROUTE_QUEUE_INSERT, true, false, false, null);/*** 隊(duì)列綁定交換機(jī)* 參數(shù)1 隊(duì)列名稱* 參數(shù)2 交換機(jī)* 參數(shù)3 routingkey-路由鍵 */channel.queueBind(ROUTE_QUEUE_INSERT, DIRECT_EXCHANGE, insertRoutingKey);/* 創(chuàng)建消費(fèi)者,設(shè)置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費(fèi)者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機(jī),消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費(fèi)者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機(jī)=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費(fèi)者收到的消息【%s】", message)); System.out.println("=== 消費(fèi)者1 end ===\n"); } };/*** 監(jiān)聽消息 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否自動確認(rèn), 設(shè)置為true表示消息接收到自動向 mq回復(fù)ack;mq收到ack后會刪除消息; 設(shè)置為false則需要手動發(fā)送ack; * 參數(shù)3 消息接收后的回調(diào) */channel.basicConsume(ROUTE_QUEUE_INSERT, true, consumer); } }消費(fèi)者-update
/*** 路由模式消費(fèi)者-routingkey */ public class RouteConsumerUpdate {/* 交換機(jī)名稱 */static final String DIRECT_EXCHANGE = "topic_exchange"; /*隊(duì)列名稱1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*隊(duì)列名稱2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*創(chuàng)建連接 */Connection conn = RBConnectionUtil.getConn();/*創(chuàng)建隊(duì)列*/Channel channel = conn.createChannel(); /*聲明交換機(jī)*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** routingkey-路由鍵*/String updateRoutingKey = "update";/*** 聲明/創(chuàng)建隊(duì)列 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否持久化* 參數(shù)3 是否獨(dú)占本連接 * 參數(shù)4 是否在不使用的時(shí)候自動刪除隊(duì)列* 參數(shù)5 隊(duì)列其他參數(shù) */channel.queueDeclare(ROUTE_QUEUE_UPDATE, true, false, false, null);/*** 隊(duì)列綁定交換機(jī)* 參數(shù)1 隊(duì)列名稱* 參數(shù)2 交換機(jī)* 參數(shù)3 routingkey-路由鍵 */channel.queueBind(ROUTE_QUEUE_UPDATE, DIRECT_EXCHANGE, updateRoutingKey);/* 創(chuàng)建消費(fèi)者,設(shè)置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費(fèi)者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機(jī),消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費(fèi)者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機(jī)=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費(fèi)者收到的消息【%s】", message)); System.out.println("=== 消費(fèi)者1 end ===\n"); } };/*** 監(jiān)聽消息 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否自動確認(rèn), 設(shè)置為true表示消息接收到自動向 mq回復(fù)ack;mq收到ack后會刪除消息; 設(shè)置為false則需要手動發(fā)送ack; * 參數(shù)3 消息接收后的回調(diào) */channel.basicConsume(ROUTE_QUEUE_UPDATE, true, consumer); } }?
?
總結(jié)
以上是生活随笔為你收集整理的rabbitmq-路由模式-routingkey的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rabbitmq-发布订阅模式
- 下一篇: 怎么查看一个网站页面的seo优化情况(如