RabbitMQ指南(下)
在上一小節(jié)中我們改進了log系統(tǒng),由于使用fanout類型的exchange只能進行全局的廣播,因此我們使用direct類型的exchange做了代替, 使得我們可以選擇性的接收消息。盡管使用fanout exchange改進了log系統(tǒng),但它仍然有限制——不能基于多個條件做路由。
Topics
在log系統(tǒng)中可能不只是基于不同的日志級別作訂閱,也可能會基于日志的來源。你也許聽過Unix下名為syslog的工具, 它把日志按照嚴重級別(info/warn/crit…)和設(shè)備(auth/cron/ker…)進行路由。
這會給我們許多的靈活性,也許我們只想監(jiān)聽’cron’中的’critical’級別的錯誤日志,以及所有’kern’中的日志。 為了實現(xiàn)這種日志系統(tǒng),我們需要學習一個更復雜的topic類型的exchange。
Topic exchange
發(fā)送到topic exchange中的消息不能有一個任意的routing_key——它必須是一個使用點分隔的單詞列表。單詞可以是任意的, 但是通常會指定消息的一些特定。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。 routing key的長度限制為255個字節(jié)數(shù)。
binding key也必須是相同的形式。topic exchange背后的邏輯類似于direct——一條使用特定的routing key發(fā)送的消息將會被傳遞至所有使用與該routing key相同的binding key進行綁定的隊列中。 然而,對binding key來說有兩種特殊的情況:
使用一張圖可以很簡單地來說明:
在圖中,我們將要發(fā)送被描述的動物的消息。消息的routing key將由三個單詞組成(通過兩個點分隔)。routing key中的第一個單詞將描述速度, 第二個是顏色,第三個是物種:"<speed>.<colour>.<species>"。
我們創(chuàng)建三個綁定:Q1使用binding key"*.orange.*"來綁定,Q2使用"*.*.rabbit"以及l(fā)azy.#綁定。
這些綁定可以被總結(jié)為:
- Q1對所有橘色的的動物感興趣
- Q2想要接收所有關(guān)于兔子的消息以及所有關(guān)于lazy的動物的消息
一條使用routing key"quick.orange.rabbit"發(fā)送的消息將被同時傳遞到兩個隊列中。消息"lazy.orange.elephant"同樣如此。 另一方面,"quick.orange.fox"只會被第一個queue接收,"lazy.brown.fox"只會被第二個queue接收。?"lazy.pink.rabbit"只會被傳遞到Q2一次,即使它對兩個binding key都匹配。"quick.brown.fox"與兩個queue的binding key都不匹配, 因此將被丟棄。
如果打破我們的約定,使用一個單詞或者四個單詞的routing key例如"orange","quick.orange.male.rabbit"發(fā)送消息將會發(fā)生什么? 這些消息不會匹配任何綁定,因此會丟失。
但是對于"lazy.orange.male.rabbit",即使它有四個單詞,但是它與第二個queue的binding key匹配,因此將會被發(fā)送到第二個queue中。
當一個queue使用"#"(hash)作為binding key,那么它將會接收所有的消息,忽略routing key,就好像使用了fanout exchange。 當特殊字符”*“(star)和”#“(hash)在綁定中沒有用到,topic exchange將會與direct exchange的行為相同。
了解了topic exchange之后,我們將它用在我們的log系統(tǒng)中,我們定義的routing key將會有兩個單詞組成:"<facility>.<severity>"。
完成的EmitLogTopic.java:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class EmitLogTopic { ????private static final String EXCHANGE_NAME = "topic_logs"; ????public static void main(String[] argv) ??????????????????throws Exception { ????????ConnectionFactory factory = new ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection connection = factory.newConnection(); ????????Channel channel = connection.createChannel(); ????????channel.exchangeDeclare(EXCHANGE_NAME, "topic"); ????????String routingKey = getRouting(argv); ????????String message = getMessage(argv); ????????channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); ????????System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); ????????connection.close(); ????} ????//... } |
完整的ReceiveLogsTopic.java:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | public class ReceiveLogsTopic { ??private static final String EXCHANGE_NAME = "topic_logs"; ??public static void main(String[] argv) throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????Connection connection = factory.newConnection(); ????Channel channel = connection.createChannel(); ????channel.exchangeDeclare(EXCHANGE_NAME, "topic"); ????String queueName = channel.queueDeclare().getQueue(); ????if (argv.length < 1) { ??????System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); ??????System.exit(1); ????} ????for (String bindingKey : argv) { ??????channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); ????} ????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ????Consumer consumer = new DefaultConsumer(channel) { ??????@Override ??????public void handleDelivery(String consumerTag, Envelope envelope, ?????????????????????????????????AMQP.BasicProperties properties, byte[] body) throws IOException { ????????String message = new String(body, "UTF-8"); ????????System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); ??????} ????}; ????channel.basicConsume(queueName, true, consumer); ??} } |
運行的時候從命令行中輸入binding key來進行綁定,接收不同的消息。
Remote procedure call (RPC)
在第二小節(jié)中我們學習了如何使用Work Queues來在多個workers中分發(fā)耗時的任務(wù)。但是如果我們需要調(diào)用遠程計算機上的一個函數(shù)并等待結(jié)果返回呢? 這就是另外一個故事了。這種模式通常稱為遠程過程調(diào)用或RPC。
在這一小節(jié)我們將使用RabbitMQ來構(gòu)建一個RPC系統(tǒng):一個客戶端和一個可擴展的RPC服務(wù)器。由于我們沒有實際的耗時任務(wù)用來分發(fā), 因此我們將創(chuàng)建一個虛擬的RPC服務(wù)返回Fibonacci數(shù)。
Client interface
為了說明RPC服務(wù)是如何使用的,我們將創(chuàng)建一個簡單的客戶端類。它將暴露一個名為call的方法發(fā)送一次RPC請求并且阻塞直到結(jié)果返回:
| 1 2 3 | FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result); |
Callback queue
使用RabbitMQ來進行RPC是非常簡單的。客戶端發(fā)送一個請求到服務(wù)端,服務(wù)端接收后返回響應(yīng)的消息。為了接收到響應(yīng)的消息,我們需要在請求中發(fā)送一個callback 的queue地址。我們可以使用默認的queue(在Java的client中它是exclusive的)。
| 1 2 3 4 5 6 7 8 9 10 | callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties ????????????????????????????.Builder() ????????????????????????????.replyTo(callbackQueueName) ????????????????????????????.build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ... |
Message properties
AMQP協(xié)議預定義了消息的14種屬性。大部分的都很少使用,除了以下這些:
- deliveryMode:標記一條消息是持久化的(使用值2)還是非持久化的(使用其它值)。在第二節(jié)中有過介紹。
- contentType:用來描述mime類型的編碼。例如使用JSON的話就這樣設(shè)置屬性:application/json。
- replyTo:一般用來命名一個回調(diào)queue。
- correlationId:用來關(guān)聯(lián)RPC的請求和響應(yīng)。
我們需要導入新的類:
| 1 | import com.rabbitmq.client.AMQP.BasicProperties; |
Correlation Id
在之前的方法中我們建議為每個RPC請求創(chuàng)建一個回調(diào)queue。這顯得有點影響性能,幸運的是有一種更好的方式——每個客戶端只創(chuàng)建一個回調(diào)queue。 但這產(chǎn)生了一個新問題,無法將相應(yīng)的Response和Request對應(yīng)起來。這個時候就需要用到correlationId屬性。對于每個請求它都將有一個唯一的值。 當我們在回調(diào)queue中接收到消息之后,檢查該屬性,看是否與Request匹配。如果是一個未知的correlationId值,那么我們可以安全的忽略這條消息, 因為它不屬于我們的請求。
你也許會問,為什么我們應(yīng)該忽略回調(diào)queue中未知的消息而不是拋出異常?這是因為服務(wù)端可能會出現(xiàn)競爭條件。盡管不太常見,但是也有可能RPC server在發(fā)送響應(yīng)后掛了, 并且也沒有接收到客戶端發(fā)送的ack。如果發(fā)生了這種情況,RPC server在重啟后將會重新處理這個請求。這就是為什么在客戶端我們需要優(yōu)雅的處理重復的響應(yīng), RPC應(yīng)該是冪等的。
Summary
我們的RPC整個過程是這樣的:
最后來看一下完整的代碼實現(xiàn)。
Fibonacci函數(shù):
| 1 2 3 4 5 | private static int fib(int n) throws Exception { ????if (n == 0) return 0; ????if (n == 1) return 1; ????return fib(n-1) + fib(n-2); } |
完整的RPCServer.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { ????QueueingConsumer.Delivery delivery = consumer.nextDelivery(); ????BasicProperties props = delivery.getProperties(); ????BasicProperties replyProps = new BasicProperties ?????????????????????????????????????.Builder() ?????????????????????????????????????.correlationId(props.getCorrelationId()) ?????????????????????????????????????.build(); ????String message = new String(delivery.getBody()); ????int n = Integer.parseInt(message); ????System.out.println(" [.] fib(" + message + ")"); ????String response = "" + fib(n); ????channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); ????channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } |
server端的代碼非常直觀:
- 首先創(chuàng)建一個連接、channel和聲明一個queue。
- 我們也許想要運行不止一個服務(wù)端進程。為了在多個server間做到負載均衡,通過channel.basicQos設(shè)置prefetchCount。
- 我們使用basicConsume來進入queue。然后使用無限循環(huán)來等待請求的消息,處理之后再返回響應(yīng)。
完整的RPCClient.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????connection = factory.newConnection(); ????channel = connection.createChannel(); ????replyQueueName = channel.queueDeclare().getQueue(); ????consumer = new QueueingConsumer(channel); ????channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { ????String response = null; ????String corrId = java.util.UUID.randomUUID().toString(); ????BasicProperties props = new BasicProperties ????????????????????????????????.Builder() ????????????????????????????????.correlationId(corrId) ????????????????????????????????.replyTo(replyQueueName) ????????????????????????????????.build(); ????channel.basicPublish("", requestQueueName, props, message.getBytes()); ????while (true) { ????????QueueingConsumer.Delivery delivery = consumer.nextDelivery(); ????????if (delivery.getProperties().getCorrelationId().equals(corrId)) { ????????????response = new String(delivery.getBody()); ????????????break; ????????} ????} ????return response; } public void close() throws Exception { ????connection.close(); } |
客戶端代碼有一點點的復雜:
- 我們創(chuàng)建連接和channel,以及聲明一個exclusive的回調(diào)queue用來接收響應(yīng)的消息。
- 訂閱回調(diào)queue,這樣就可以接收到RPC服務(wù)端響應(yīng)的消息。
- call方法發(fā)出一個RPC請求。
- 我們首先生成一個唯一的correlationId數(shù)字并且保存它——在while循環(huán)中使用它來匹配相應(yīng)的response。
- 下一步,發(fā)送請求的消息,使用兩個屬性:replyTo和correlationId。
- 之后就是等待響應(yīng)的消息返回。
- 在while循環(huán)中做了一些簡單的工作,檢查響應(yīng)的消息的correlationId是否與Request相匹配。如果是的話,則保存響應(yīng)。
- 最終向用戶返回響應(yīng)。
發(fā)送客戶端請求:
| 1 2 3 4 5 6 7 | RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close(); |
這樣就通過RabbitMQ簡單的實現(xiàn)了RPC的通信。
from:?http://www.importnew.com/24329.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ指南(下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ指南(中)
- 下一篇: 消息中间件收录集