RabbitMQ开发详解
?
?
目錄
?
開發步驟
引入client
生產者
消費者
應用場景
簡單隊列
工作隊列
發布/訂閱
路由模式
topic模式
rpc模式
發布確認
開發步驟
引入client
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.2.0</version> </dependency>生產者
1、引入類
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;2、創建Connection
ConnectionFactory factory = new ConnectionFactory();// 設置服務地址factory.setHost("127.0.0.1");// 端口factory.setPort(5672);// vhostfactory.setVirtualHost("/vhost_test");// 用戶名factory.setUsername("admin");// 密碼factory.setPassword("123456");Connection connection = factory.newConnection();3、創建Channel
Channel channel = connection.createChannel()channel設置:
?
4、聲明交換器、隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);exchange聲明:
DeclareOk exchangeDeclare?(String exchange, String type) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type) throws IOException; DeclareOk exchangeDeclare?(String exchange, String type, boolean durable) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type, boolean durable) throws IOException; DeclareOk exchangeDeclare?(String exchange, String type, boolean durable, boolean autoDelete, Map<String,?Object> arguments) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,?Object> arguments) throws IOException; DeclareOk exchangeDeclare?(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,?Object> arguments) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,?Object> arguments) throws IOException;?參數說明:
-
exchange:exchange名稱
-
type:exchange類型,BuiltinExchangeType 枚舉類型包括:fanout,direct,topic,header。
-
durable:是否持久化
-
internal:是否內部exchange,生產者不能直接發布到內部exchange。
-
arguments:其他參數,用于構造exchange。
隊列聲明:
DeclareOk queueDeclare() throws IOException; DeclareOk queueDeclare?(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,?Object> arguments) throws IOException;參數說明:
- queue:隊列名稱
- durable:是否持久化
- exclusive:是否私有,僅當前程序可訪問。
- autoDelete:當最后一個消費者取消訂閱后,自動刪除。
- arguments:其他參數,創建queue時使用。
?
5、發送消息
String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());?
void basicPublish?(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException; void basicPublish?(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException; void basicPublish?(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;參數說明:
- ?exchange:
- routingKey:
- props:消息屬性
- body:消息體的byte[]格式。
- mandatory:當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置為false時,出現上述情形broker會直接將消息扔掉。
- immediate:當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對于的queue上么有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。
mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
在RabbitMQ3.0以后的版本里,去掉了immediate參數的支持,發送帶immediate標記的publish會返回如下錯誤:
“{amqp_error,not_implemented,“immediate=true”,‘basic.publish’}”,immediate標記會影響鏡像隊列性能,增加代碼復雜性,并建議采用“TTL”和“DLX”等方式替代。
?
消費者
1、引入類
同生產者
2、創建Connection
同生產者
3、創建Channel
同生產者
4、聲明交換器,隊列
同生產者
5、構造Consumer
// 創建消費者Consumer consumer = new DefaultConsumer(channel) {// 獲取消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "utf-8");System.out.println("接收到消息——" + msg);}};6、接收消息并處理
// 監聽隊列channel.basicConsume(QUEUE, true, consumer); String basicConsume?(String queue, Consumer callback) throws IOException String basicConsume?(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException參數說明:
- queue:隊列名稱。
- autoAck:服務器收到消息后是否自動應答。
- consumerTag:消費者標簽,用來區分多個消費者
- noLocal:設置為true,表示 不能將同一個Conenction中生產者發送的消息傳遞給這個Connection中 的消費者
- exclusive:是否排他
- arguments:消費者的參數
- callback:消費者 DefaultConsumer,用于消費消息,需要重寫其中的方法
- 其他callback
- handleCancel:除了調用basicCancel的其他原因導致消息被取消時調用。
- handleCancelOk:basicCancel調用導致的訂閱取消時被調用。
- handleConsumeOk:任意basicComsume調用導致消費者被注冊時調用。
- handleDelivery:消息接收時被調用。
- handleRecoverOk:basic.recover-ok被接收時調用
- handleShutdownSignal:當Channel與Conenction關閉的時候會調用。
應用場景
各場景比較
| 應用場景 | exchange | 隊列 | 生產者端 | 消費者端 |
| 簡單隊列 | 無 | 單個隊列 | 發送到指定隊列 | 自動應答 |
| 工作隊列 | 無 | 多個隊列 | 發送到指定隊列 | 自動應答/? ? 手動應答,公平分發 |
| 發布/訂閱 | fanout | 多個隊列 | 發送到指定exchange,不設置routing key | 消費指定隊列。 |
| 路由模式 | direct | 多個隊列 | 發送到指定exchange,設置routing key | 消費指定隊列。指定1個或者多個binging key |
| topic模式 | topic | 多個隊列 | 發送到指定exchange,設置routing key。key中包含點號(.)。 | 消費指定隊列。指定1個或者多個binging key,key中包含點號(.)。 |
| rpc | ? | ? | ? | ? |
| 發布確認 | ? | ? | ? | ? |
簡單隊列
生產者直接發送消息到隊列,消費者直接消費隊列消息。
/*生產者代碼 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //直接發送消息到隊列。exchange參數為"" channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); /* 消費者代碼 */ //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定義消息消費 DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'"); }; //開始消費 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });?
工作隊列
一般在實際應用中,生產者發送消息耗時較少,反應較快,反而是消費者因為要處理業務邏輯,處理時間可能會很慢,這樣隊列中會積壓很多消息,所以需要多個消費者分攤壓力,這個時候可以使用工作隊列。
生產者消費者代碼與簡單隊列一樣,差別為運行多個消費者代碼實例。
消費者默認采用Round-robin方式輪詢分發,每個消費者接收到的消息數基本一樣。如果消費者處理消息速度不一致,會導致一個空閑,一個繁忙。
可以采用公平模式,如果消費者未處理完消息,則隊列不會再發送消息給此消費者,只到上一條消息處理完。
?修改:
- 添加channel.basicQos(1):保證一次只分發一條消息。
- channel.basicAck(envelope.getDeliveryTag(), false):手動確認消息。false表示確認接收消息,true表示拒絕接收消息。
- channel.basicConsume(QUEUE, false, consumer):設置自動應答為false。
?
發布/訂閱
生產者沒有把消費發送給隊列,而是發送給exchange,由exchange進行路由到綁定的隊列,消費者僅消費對應隊列,例如事件通知通過郵件和短信進行通知。
exchange的類型為fanout。
private static final String EXCHANGE_NAME = "logs";//聲明一個exchange:logs channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//發送消息到exchange:logs channel.basicPublish( "logs", "", null, message.getBytes());?
private static final String EXCHANGE_NAME = "logs";//聲明exchange:logschannel.exchangeDeclare(EXCHANGE_NAME, "fanout");//聲明一個隨機名稱的臨時隊列String queueName = channel.queueDeclare().getQueue();//綁定隊列與exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//開始消費臨時隊列:channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });?
路由模式
exchange類型為direct。
private static final String EXCHANGE_NAME = "direct_logs";//指定exchange為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct");String severity = getSeverity(argv); String message = getMessage(argv); //指定routing key:severity channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); //聲明exchange:類型為:directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();//綁定隊列與exchange,routing key:severityfor (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};//消費消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}?
topic模式
exchange類型為topic。
topic模式與direct模式代碼一樣,區別為binding key與routing key 必須包含點號(.)。
?
channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = getRouting(argv); String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));?
channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });?
rpc模式
發布確認
?
?
總結
以上是生活随笔為你收集整理的RabbitMQ开发详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 开发管理工具
- 下一篇: spring boot添加 LocalD