生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ 四种Exchange
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似于一個交換機,轉發各個消息分發到相應的隊列中。
?
RabbitMQ提供了四種Exchange模式:fanout、direct、topic、header 。?header模式在實際使用中較少。
Direct Exchange ?– 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。? ?
Java代碼??
Channel?channel?=?connection.createChannel();?? channel.exchangeDeclare("exchangeName",?"direct");? channel.queueDeclare("queueName");?? channel.queueBind("queueName",?"exchangeName",?"routingKey");?? ?? byte[]?messageBodyBytes?=?"hello?world".getBytes();?? channel.basicPublish("exchangeName",?"routingKey",?MessageProperties.PERSISTENT_TEXT_PLAIN,?messageBodyBytes);?? Fanout Exchange ?– 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。? ?
Java代碼??
Channel?channel?=?connection.createChannel();?? channel.exchangeDeclare("exchangeName",?"fanout");? channel.queueDeclare("queueName");?? channel.queueBind("queueName",?"exchangeName",?"routingKey");?? ?? channel.queueDeclare("queueName1");?? channel.queueBind("queueName1",?"exchangeName",?"routingKey1");?? ?? byte[]?messageBodyBytes?=?"hello?world".getBytes();?? channel.basicPublish("exchangeName",?"",?MessageProperties.PERSISTENT_TEXT_PLAIN,?messageBodyBytes);?? Topic Exchange? – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:? ?
Java代碼??
Channel?channel?=?connection.createChannel();?? channel.exchangeDeclare("exchangeName",?"topic");? channel.queueDeclare("queueName");?? channel.queueBind("queueName",?"exchangeName",?"routingKey.*");?? byte[]?messageBodyBytes?=?"hello?world".getBytes();?? channel.basicPublish("exchangeName",?"routingKey.one",?MessageProperties.PERSISTENT_TEXT_PLAIN,?messageBodyBytes);?? header exchange:
Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。fanout,direct,topic?exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。
1.生產者Producer.Java
?
[java] ?view plaincopy print?
package?cn.slimsmart.rabbitmq.demo.headers;?? ?? import?java.util.Date;?? import?java.util.Hashtable;?? import?java.util.Map;?? ?? import?org.springframework.amqp.core.ExchangeTypes;?? ?? import?com.rabbitmq.client.AMQP;?? import?com.rabbitmq.client.AMQP.BasicProperties;?? import?com.rabbitmq.client.AMQP.BasicProperties.Builder;?? import?com.rabbitmq.client.Channel;?? import?com.rabbitmq.client.Connection;?? import?com.rabbitmq.client.ConnectionFactory;?? ?? public?class?Producer?{?? ????private?final?static?String?EXCHANGE_NAME?=?"header-exchange";?? ?????? ????@SuppressWarnings("deprecation")?? ????public?static?void?main(String[]?args)?throws?Exception?{?? ???????? ????????ConnectionFactory?factory?=?new?ConnectionFactory();?? ????????factory.setHost("192.168.36.102");?? ???????? ????????factory.setUsername("admin");?? ????????factory.setPassword("admin");?? ???????? ????????factory.setPort(AMQP.PROTOCOL.PORT);?? ????????Connection?connection?=?factory.newConnection();?? ????????Channel?channel?=?connection.createChannel();?? ?????????? ???????? ????????channel.exchangeDeclare(EXCHANGE_NAME,?ExchangeTypes.HEADERS,false,true,null);?? ????????String?message?=?new?Date().toLocaleString()?+?"?:?log?something";?? ?????????? ????????Map<String,Object>?headers?=??new?Hashtable<String,?Object>();?? ????????headers.put("aaa",?"01234");?? ????????Builder?properties?=?new?BasicProperties.Builder();?? ????????properties.headers(headers);?? ?????????? ???????? ????????channel.basicPublish(EXCHANGE_NAME,?"",properties.build(),message.getBytes());?? ?????????? ????????System.out.println("Sent?message?:'"?+?message?+?"'");?? ????????channel.close();?? ????????connection.close();?? ????}?? }?? 2.消費者Consumer.java
?
?
[java] ?view plaincopy print?
package?cn.slimsmart.rabbitmq.demo.headers;?? ?? import?java.util.Hashtable;?? import?java.util.Map;?? ?? import?org.springframework.amqp.core.ExchangeTypes;?? ?? import?com.rabbitmq.client.AMQP;?? import?com.rabbitmq.client.Channel;?? import?com.rabbitmq.client.Connection;?? import?com.rabbitmq.client.ConnectionFactory;?? import?com.rabbitmq.client.QueueingConsumer;?? ?? public?class?Consumer?{?? ????private?final?static?String?EXCHANGE_NAME?=?"header-exchange";?? ????private?final?static?String?QUEUE_NAME?=?"header-queue";?? ?????? ????public?static?void?main(String[]?args)?throws?Exception?{?? ???????? ????????ConnectionFactory?factory?=?new?ConnectionFactory();?? ????????factory.setHost("192.168.36.102");?? ???????? ????????factory.setUsername("admin");?? ????????factory.setPassword("admin");?? ???????? ????????factory.setPort(AMQP.PROTOCOL.PORT);?? ????????Connection?connection?=?factory.newConnection();?? ????????Channel?channel?=?connection.createChannel();?? ?????????? ???????? ????????channel.exchangeDeclare(EXCHANGE_NAME,?ExchangeTypes.HEADERS,false,true,null);?? ????????channel.queueDeclare(QUEUE_NAME,false,?false,?true,null);?? ?????????? ????????Map<String,?Object>?headers?=?new?Hashtable<String,?Object>();?? ????????headers.put("x-match",?"any"); ????????headers.put("aaa",?"01234");?? ????????headers.put("bbb",?"56789");?? ???????? ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,"",?headers);?? ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);?? ???????? ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);?? ????????while?(true)?{?? ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();?? ????????????String?message?=?new?String(delivery.getBody());?? ????????????System.out.println(message);?? ????????}??? ????}?? }?? ?
實例代碼:http://download.csdn.net/detail/tianwei7518/8136413
總結
以上是生活随笔 為你收集整理的RabbitMQ 四种Exchange 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。