RabbitMQ之主题(Topic)
為什么80%的碼農都做不了架構師?>>> ??
上一篇博客中,我們進一步改良了我們的日志系統。我們使用direct類型轉發器,使得接收者有能力進行選擇性的接收日志,而非fanout那樣只能夠無腦轉發。
雖然使用direct類型改良了我們的系統,但是仍然存在一些局限性:它不能夠基于多重條件進行路由選擇。在我們的日志系統中,我們有可能希望不僅根據日志的級別而且想根據日志的來源進行訂閱。這個根據類似于unix工具:syslog,它轉發日志基于嚴重性(info/warning/crit...)和設備(auth/cron/kern...)。這樣可能給我們更多的靈活性:我們可能只想訂閱來自'cron'的致命錯誤日志,而不是來自'kern'的。為了在我們的系統中實現上述的需求,我們需要學習稍微復雜的主題類型的轉發器(topic exchange)。
1. 主題轉發(Topic Exchange)
發往主題類型的轉發器的消息不能隨意的設置選擇鍵(routing key),必須是由點隔開的一系列的標識符組成。標識符可以是任何東西,但是一般都與消息的某些特性相關。一些合法的選擇鍵的例子:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。你可以定義任何數量的標識符,上限為255個字節。
綁定鍵和選擇鍵的形式一樣。主題類型的轉發器背后的邏輯和直接類型的轉發器很類似:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。需要注意的是:關于綁定鍵有兩種特殊的情況:
*可以匹配一個標識符。
#可以匹配0個或多個標識符。
2. 圖解
我們準備發送關于動物的消息。消息會附加一個選擇鍵包含3個標識符(兩個點隔開)。第一個標識符描述動物的速度,第二個標識符描述動物的顏色,第三個標識符描述動物的物種:<speed>.<color>.<species>。
我們創建3個綁定鍵:Q1與*.orange.*綁定,Q2與*.*.rabbit和lazy.#綁定。可以簡單的認為:
Q1對所有的橙色動物感興趣。
Q2想要知道關于兔子的一切以及關于懶洋洋的動物的一切。
一個附帶quick.orange.rabbit的選擇鍵的消息將會被轉發到兩個隊列。附帶lazy.orange.elephant的消息也會被轉發到兩個隊列。另一方面quick.orange.fox只會被轉發到Q1,lazy.brown.fox將會被轉發到Q2。lazy.pink.rabbit雖然與兩個綁定鍵匹配,但是也只會被轉發到Q2一次。quick.brown.fox不能與任何綁定鍵匹配,所以會被丟棄。如果我們違反我們的約定,發送一個或者四個標識符的選擇鍵,類似:orange,quick.orange.male.rabbit,這些選擇鍵不能與任何綁定鍵匹配,所以消息將會被丟棄。另一方面,lazy.orange.male.rabbit,雖然是四個標識符,也可以與lazy.#匹配,從而轉發至Q2。
注:主題類型的轉發器非常強大,可以實現其他類型的轉發器。當一個隊列與綁定鍵#綁定,將會收到所有的消息,類似fanout類型轉發器。當綁定鍵中不包含任何#與*時,類似direct類型轉發器。
3. 完整的例子
發送端:EmitLogTopic.java
package?cc.openscanner;import?java.util.UUID; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;public?class?EmitLogTopic?{private?static?final?String?EXCHANGE_NAME?=?"topic_logs";public?static?void?main(String[]?args)?throws?Exception?{//創建連接和通道ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"topic");String[]?routing_keys?=?new?String[]{"kernal.info","cron.warning","auth.info","kernel.critical"};for(String?routing_key?:?routing_keys){String?msg?=?UUID.randomUUID().toString();channel.basicPublish(EXCHANGE_NAME,?routing_key,?null,?msg.getBytes());System.out.println("?[x]?Sent?routingKey?=?"?+?routing_key?+?"?,msg?=?"?+?msg?+?".");}channel.close();connection.close();} }我們發送了4條消息,分別設置了不同的選擇鍵。
接收端1,ReceiveLogsTopicForKernel.java
package?cc.openscanner;import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.QueueingConsumer;public?class?ReceiveLogsTopicForKernel?{private?static?final?String?EXCHANGE_NAME?=?"topic_logs";public?static?void?main(String[]?args)?throws?Exception{//創建連接和通道ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//聲明轉發器channel.exchangeDeclare(EXCHANGE_NAME,?"topic");//隨機生成一個隊列String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"kernel.*");System.out.println("?[*]?Waiting?for?messages?about"+"?kernel.?To?exit?press?CTRL+C");QueueingConsumer?consumer?=?new?QueueingConsumer(channel);channel.basicConsume(queueName,?true,consumer);while(true){QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();String?message?=?new?String(delivery.getBody());String?routingKey?=?delivery.getEnvelope().getRoutingKey();System.out.println("?[x]?Received?routingKey?=?"?+?routingKey?+?",msg?=?"?+?message?+?".");}} }只接收和Kernel相關的日志消息。
接收端2:ReceiveLogsTopicForCritical.java
package?cc.openscanner;import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.QueueingConsumer;public?class?ReceiveLogsTopicForCritical?{private?static?final?String?EXCHANGE_NAME?=?"topic_logs";public?static?void?main(String[]?args)?throws?Exception{//創建連接和通道ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//聲明轉發器channel.exchangeDeclare(EXCHANGE_NAME,?"topic");//隨機生成一個隊列String?queueName?=?channel.queueDeclare().getQueue();//接收所有與kernel相關的消息channel.queueBind(queueName,?EXCHANGE_NAME,?"*.critical");System.out.println("?[*]?Waiting?for?critical?messages.?"+"To?exit?press?CTRL+C");QueueingConsumer?consumer?=?new?QueueingConsumer(channel);channel.basicConsume(queueName,true,consumer);while(true){QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();String?message?=?new?String(delivery.getBody());String?routingKey?=?delivery.getEnvelope().getRoutingKey();System.out.println("?[x]?Received?routingKey?=?"?+?routingKey?+?",msg?=?"?+?message?+?".");}} }只接收致命錯誤的日志消息。
運行結果:
[x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
?[x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
?[x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
?[x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
--------------------------------------------------------------------------------------------------------------------
?[*] Waiting for messages about kernel. To exit press CTRL+C
?[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
--------------------------------------------------------------------------------------------------------------------
?[*] Waiting for critical messages. To exit press CTRL+C
?[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
可以看到,我們通過使用topic類型的轉發器,成功實現了多重條件選擇的訂閱。
轉載于:https://my.oschina.net/fhd/blog/376252
總結
以上是生活随笔為你收集整理的RabbitMQ之主题(Topic)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《Objective-c》-(第一个OC
- 下一篇: 【OpenCV归纳】4 关于HighGU