rabbitmq rpc
目錄
概述
過程描述
代碼
結果
遠程過程調用(RPC): 當客戶端發送請求到遠程服務器,遠程服務器接收請求并處理結果,這時候將結果響應給客戶端,這個過程被稱為遠程過程調用
隊列
在整個過程中用會涉及到兩個隊列一個是專門保存請求的隊列,稱為rpc_queue,另一個隊列被稱為響應隊列,專門用于保存服務器處理的響應結果,這個隊列的名字是隨機生成的字符串。
消息的基本屬性BasicProperties
響應隊列名字 回復(replyTo):是響應隊列的名字,當服務器接收并處理好結果,這時候服務器需要知道將響應的信息發送到哪個隊列中;
關聯id(correlationId):是一個UUID值,發消息的時候會帶上這個值,該值在客戶端接收響應時用于判斷接收到的響應消息是否是自己發出請求對應的響應; 客戶端在發送請求時需要帶上replyTo和correlationId兩個屬性。
過程描述
客戶端發送消息到請求隊列,在發送請求時需要指定兩個值(replyTo和correlationId)----------->服務端為隨時接受到請求消息,需要預先訂閱請求隊列(rpc_queue),,當服務端接收到請求消息時對請求進行處理,將處理結果發送到響應隊列(隨機隊列)中--------------->客戶端也需要預先訂閱響應隊列(隨機隊列),以便服務器發送響應消息到響應隊列中,客戶端能及時收到響應結果,服務器在將響應發送到響應隊列中還要指定correlationId值(唯一標識),這樣客戶端接收到消息時就可以通過correlationId的值是否和發送請求的關聯id值是否相同,如果相同就證明這個響應結果就是這個請求對應的響應結果。
注意:這個預先訂閱響應隊列的步驟需要在客戶端中完成,最好在客戶端發送請求消息前就完成。.
代碼
服務端:
package com.ll.mq.hellomq.rpc;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; //ll public class Service {public static void main(String[] args) {try {// 創建連接工廠ConnectionFactory factory = new ConnectionFactory(); // 設置RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456"); // 創建一個連接Connection connection = factory.newConnection(); // 創建一個頻道final Channel channel = connection.createChannel();String rpc_queuqu = "rpc_queue";channel.queueDeclare(rpc_queuqu, false, false, false, null);// DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDeliveryDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");// 服務器端接收到消息并處理消息String response = "{'code': 200, 'data': '" + message+ "'}";// // 將消息發布到reply_to響應隊列中AMQP.BasicProperties replyProperties = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String replyTo = properties.getReplyTo();channel.basicPublish("", replyTo, replyProperties, response.getBytes("UTF-8"));System.out.println("服務端:請求已處理完畢,響應結果" + response + "已發送到響應隊列中"); // // 手動應答channel.basicAck(envelope.getDeliveryTag(), true);}}; // 自動回復隊列應答channel.basicConsume(rpc_queuqu, false, consumer);} catch (Exception e) {e.printStackTrace();}} }客戶端
package com.ll.mq.hellomq.rpc;import java.io.IOException; import java.util.UUID;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;public class Client {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 預先訂閱響應結果的隊列,先訂閱響應隊列,再發送消息到請求隊列String reyply_to_queue = channel.queueDeclare().getQueue();final String correlationId = UUID.randomUUID().toString();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(correlationId)) {String message = new String(body, "UTF-8");System.out.println("已接收到服務器的響應結果:" + message);}}};channel.basicConsume(reyply_to_queue, true, consumer);// 將消息發送到請求隊列中String rpc_queuqu = "rpc_queue";String message = "Hello RabbitMQ";AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(reyply_to_queue).build();channel.basicPublish("", rpc_queuqu, properties, message.getBytes("UTF-8"));System.out.println("已發出請求請求消息:" + message);} catch (Exception e) {e.printStackTrace();}} }結果
客戶端
已發出請求請求消息:Hello RabbitMQ
已接收到服務器的響應結果:{'code': 200, 'data': 'Hello RabbitMQ'}
服務端
服務端:請求已處理完畢,響應結果{'code': 200, 'data': 'Hello RabbitMQ'}已發送到響應隊列中
?
?
總結
以上是生活随笔為你收集整理的rabbitmq rpc的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 23种设计模式C++源码与UML实现--
- 下一篇: 23种设计模式C++源码与UML实现--