生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ保姆级教程
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 前言
- 一、MQ是什么?
-
- 二、在Linux安裝RabbitMQ
-
- 2.1 安裝
- 2.2 RabbitMQ啟動命令
- 2.3 開啟RabbitMQ 后臺管理界面
-
- 2.3 Docker啟動RabbitMQ
- 2.4 常見消息模型
- 2.5 生產者(Producer) / 消費者(Consumer)
- 2.6 工作隊列模式(Work Queues)
- 2.7 參數細節
- 2.8 實現能者多勞
-
- 2.8.1 Ack手動應答防止數據丟失和消息拒收后重新發送
- 2.8.2 預取值
- 2.9 Publish/Subscribe 發布/訂閱
- 2.10 Routing(路由) - Direct
- 2.11 Routing(路由)- Topic
- 三、進階篇 高級特性
-
- 3.1 死信隊列
-
- 3.1.1 死信隊列實戰:消息TTL過期
- 3.1.2 死信隊列實戰:隊列達到最大長度 設置正常隊列最大長度
- 3.1.3 死信隊列實戰:消息被拒
- 3.2 基于SpringBoot實現延遲隊列
- 3.3 發布確認 高級特性
-
- 3.3.1 可靠性投遞confirm模式
- 3.3.2 可靠性投遞return模式
- 3.4 優先級隊列
- 3.5 消費端限流
前言
提示:RaabitMQ消息隊列的學習。
一、MQ是什么?
- MQ全稱 Message Queue(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于分布式系統
之間進行通信。 - RabbitMQ 是一個消息中間件:它接受并轉發消息。你可以把它當做一個快遞站點,當你要發送一個包
裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那里,按照這種邏輯 RabbitMQ 是
一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ 與快遞站的主要區別在于,它不處理快件而是接收,
存儲和轉發消息數據。
- 工作原理
1.1 AMQP
- AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協議),是一個網絡協議,是應用
層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,遵
循此協議,不收客戶端和中間件產品和開發語言限制。2006年,AMQP 規范發布。類比HTTP。
二、在Linux安裝RabbitMQ
2.1 安裝
1. 我們把erlang環境與rabbitMQ 安裝包解壓到
Linux2. rpm
-ivh erlang安裝包
3. yum install socat
-y 安裝依賴
/ rpm
-ivh socat依賴包
--force
--nodeps
4. rpm
-ivh rabbitmq安裝包
2.2 RabbitMQ啟動命令
1. 開啟服務
/sbin
/service rabbitmq
-server start
/ service rabbitmq
-server start
2. 停止服務 service rabbitmq
-server stop
3. 重啟服務 service rabbitmq
-server restart
2.3 開啟RabbitMQ 后臺管理界面
1. rabbitmq
-plugins enable rabbitmq_management
1. 創建rabbitMQ賬號rabbitmqctl add_user 用戶名 密碼
2. 設置用戶角色rabbitmqctl set_user_tags 用戶名 administrator #設置用戶名為超級管理員
3. 設置用戶權限rabbitmqctl set_permissions
-p
"/" admin
".*" ".*" ".*"4. 查看rabbitmq的用戶和角色rabbitmqctl list_users
5. 登錄rabbitMQ 界面:
Linux虛擬機ip
:15672 即可
2.3.1 登錄rabbitMQ UI界面
記得開放
15672端口訪問
Linux虛擬機ip
:15672 即可
輸入賬戶密碼后看到這個界面代表成功
2.3 Docker啟動RabbitMQ
Docker安裝
1. docker pull rabbitmq
:3-management
2. 開啟rabbitMQdocker run \
-e RABBITMQ_DEFAULT_USER
=root \
-e RABBITMQ_DEFAULT_PASS
=123456 \
--name mq \
--hostname mq1 \
-p
15672:15672 \
-p
5672:5672 \
-d \rabbitmq
:3-management
2.4 常見消息模型
- channel:操作MQ的工具
- exchange:路由消息到隊列中
- queue:緩存消息
- virtual host:虛擬主機,是對queue、exchange等資源的邏輯分組
2.5 生產者(Producer) / 消費者(Consumer)
<dependencies><dependency><groupId>com.rabbitmq
</groupId><artifactId>amqp-client
</artifactId><version>5.7.3
</version></dependency><dependency><groupId>commons-io
</groupId><artifactId>commons-io
</artifactId><version>2.4
</version></dependency></dependencies>
1234567891011121314
public class Producer {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);String message
="hello rabbitMQ";channel
.basicPublish("",QUEUE_NAME
,null,message
.getBytes());System.out
.println("消息發送完畢");channel
.close();connection
.close();}
}
public class Consumer {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,true,deliverCallback
,cancelCallback
);}
}
2.6 工作隊列模式(Work Queues)
- 模式說明
- Work Queues:與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消費,采用的是 輪詢機制
- 應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度
- 工作模式:生產者
public class ProducerWorkQueue {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);for (int i
= 1; i
<= 10; i
++) {String message
=i
+"hello rabbitMQ";channel
.basicPublish("",QUEUE_NAME
,null,message
.getBytes());System.out
.println("消息發送完畢");}channel
.close();connection
.close();}
}
public class ConsumerWorkQueues1 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,true,deliverCallback
,cancelCallback
);}
}
public class ConsumerWorkQueues2 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,true,deliverCallback
,cancelCallback
);}
}
- 結果:各執行五次,也驗證了 我們上面所說的 輪詢機制
- 小結:
一個消息只能有一個接收者,但是可以有多個接收者
2.7 參數細節
- durable:是否進行持久化,當前隊列如果進行持久化,我們重啟rabbitMQ后當前隊列依舊存在
channel
.queueDeclare(QUEUE_NAME
,(durable
)true/false,false,false,null);
- props :隊列中的信息是否持久化,若消息持久化,我們重啟rabbitMQ后當前隊列依舊存在
channel
.basicPublish("",QUEUE_NAME
, MessageProperties.PERSISTENT_TEXT_PLAIN
,message
.getBytes());
- autoDelete:是否自動刪除,最后一個消費者斷開連接后,該隊列是否自動刪除
channel
.queueDeclare(QUEUE_NAME
,false,false,(autoDelete的參數位置
)false,null);
若開啟了自動應答
,rabbitMQ消息隊列分配給消費者
10個數據
,只要消費者拿到消息隊列的數據時
,就會告訴消息隊列
,數據處理完畢。若當我們處理到第
5個數據時
,消費者出現了宕機
,死掉了
,則會出現數據丟失
channel
.basicConsume(QUEUE_NAME
,(autoAck是否自動應答
)false,deliverCallback
,cancelCallback
);
2.8 實現能者多勞
-
業務場景:
當我們的兩個消費者執行業務時,a消費者執行速度快,b消費者執行速度慢,我們想讓執行速度快的多執行,應當如何實現呢?
- 開啟不公平分發,能者多勞 channel.basicQos(1); 0:輪詢機制 1:能者多勞
- 開啟手動確認
-
消費者a
public class ConsumerWorkQueues1 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.basicQos(1);DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
public class ConsumerWorkQueues2 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.basicQos(1);DeliverCallback deliverCallback
=(consumerTag
, message
)-> {try {Thread.sleep(100);} catch (InterruptedException e
) {e
.printStackTrace();}System.out
.println(new String(message
.getBody()));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
2.8.1 Ack手動應答防止數據丟失和消息拒收后重新發送
- 應用場景:兩個消費者每次都從隊列中來獲取消息,若消費者a正常執行,消費者b在執行過程中出現了宕機,掛掉了那么我們未被消費的消息會被重新放回到隊列中,防止消息丟失。
生產者
public class ProducerWorkQueue {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);Scanner scanner
= new Scanner(System.in
);while (true){String msg
= scanner
.nextLine();channel
.basicPublish("",QUEUE_NAME
, null,msg
.getBytes());System.out
.println("消息發送完畢");}}
}
消費者a
public class ConsumerWorkQueues1 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println("消費者1===>"+new String(message
.getBody()));try {int i
=3/0;channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);}catch (Exception e
){System.out
.println("拒收消息發生了異常");channel
.basicNack(message
.getEnvelope().getDeliveryTag(),false,true);}};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
消費者b
public class ConsumerWorkQueues2 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {System.out
.println("睡10秒");try {Thread.sleep(1000*10);} catch (InterruptedException e
) {e
.printStackTrace();}System.out
.println(new String(message
.getBody()));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
- 當消費者b在消費消息時,我們讓消費者b睡眠10秒模擬業務流程,在這10秒內我們手動關掉消費者b
發送 aa 消費者a接收
發送bb消費者b接收,在消費者b睡眠過程中我們停止消費者b,來看看手動應答的結果
此時我們查看消費者a,出現了本應該是消費者b消費的消息bb
2.8.2 預取值
channel
.basicQos(1); 0:輪詢機制
1:能者多勞 若值
>1代表當前隊列的預取值
,代表當前隊列大概會拿到多少值
2.9 Publish/Subscribe 發布/訂閱
- 也可以叫 廣播模式,當我們的P消費者發送了消息,交給了X(交換機),所有綁定了這個X(交換機)的隊列都可以接收到P消費者發送的消息
- 代碼實現生產者
public class Provider {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order", "fanout");channel
.basicPublish("order","",null,"fanout type message".getBytes());channel
.close();connection
.close();}
}
public class Consumer {public static void main(String[] args
) throws IOException, TimeoutException {ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","fanout");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"order","");channel
.basicConsume(queueName
,true,(consumerTag
,message
)->{System.out
.println("消費者1===>"+new String(message
.getBody()));},consumerTag
-> System.out
.println("取消消費消息"));}
}
2.10 Routing(路由) - Direct
routing值訂閱模型-Direct(直連)
public class Provider {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("logsExchange","direct");channel
.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 發送了消息".getBytes());channel
.close();connection
.close();}
}
public class Consumer1 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("logs","direct");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"logsExchange","infoRouting");channel
.queueBind(queueName
,"logsExchange","msgRouting");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
public class Consumer2 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("logs","direct");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"logs","error");channel
.queueBind(queueName
,"logs","msg");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
2.11 Routing(路由)- Topic
- Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。
- 只不過Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符!
#通配符
* (star
) can substitute
for exactly one word :匹配一個詞#
(hash
) can substitute
for zero or more words :匹配一個或多個詞
public class Provider {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","topic");String routingKey
="user.order";channel
.basicPublish("order",routingKey
,null,("routing logs topic發送了消息"+routingKey
).getBytes());channel
.close();connection
.close();}
}
public class Consumer1 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","topic");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"order","user.*");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
public class Consumer2 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","topic");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"order","user.#");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
三、進階篇 高級特性
3.1 死信隊列
死信,顧名思義就是無法被消費的信息,字面意思可以這樣理解,一般來說,producer將消息投遞到queue里,consumer從queue取出消息進行消費,但某些時候由于特定的原因導致queue中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信,自然就有了死信隊列
為了保證訂單業務的消息數據不丟失,需要使用到RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中。比如說:用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效
- 生產者:給正產的消息隊列發送消息,并且設置消息過期時間為10S,超過10S消息未被消費,則消息進入死信隊列
public class TTLProvider {public static final String NORMAL_EXCHANGE
="normal_exchange";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("賬戶");factory
.setPassword("密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();AMQP.BasicProperties properties
=new AMQP.BasicProperties().builder().expiration("10000").build();for (int i
= 1; i
<= 10; i
++) {String msg
=""+i
;channel
.basicPublish(NORMAL_EXCHANGE
,"normal",properties
,msg
.getBytes());}System.out
.println("結束發送");}
}
public class TTLConsumer1 {public static final String NORMAL_EXCHANGE
="normal_exchange";public static final String DEAD_EXCHANGE
="dead_exchange";public static final String NORMAL_QUEUE
="normal_queue";public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("賬戶");factory
.setPassword("密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare(NORMAL_EXCHANGE
,"direct");channel
.exchangeDeclare(DEAD_EXCHANGE
,"direct");HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange",DEAD_EXCHANGE
);map
.put("x-dead-letter-routing-key","dead");channel
.queueDeclare(NORMAL_QUEUE
,false,false,false,map
);channel
.queueDeclare(DEAD_QUEUE
,false,false,false,null);channel
.queueBind(NORMAL_QUEUE
,NORMAL_EXCHANGE
,"normal");channel
.queueBind(DEAD_QUEUE
,DEAD_EXCHANGE
,"dead");DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(NORMAL_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
public class TTLConsumer2 {public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("賬戶");factory
.setPassword("密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(DEAD_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
- 結果:當設置了死信隊列,和TTL過期時間,若超過了過期時間消息未被消費,則消息會轉發到死信隊列
死信隊列產生三大原因 - 消息被拒接
- 消息TTL過期
- 隊列達到最大長度
3.1.1 死信隊列實戰:消息TTL過期
@Configuration
public class RabbitMQConfiguration {public static final String X_EXCHANGE
="X";public static final String Y_DEAD_LETTER_EXCHANGE
="Y";public static final String QUEUE_A
="QA";public static final String QUEUE_B
="QB";public static final String DEAD_QUEUE_D
="QD";@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE
);}@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE
);}@Beanpublic Queue queueA(){Map<String,Object> arg
=new HashMap<>();arg
.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE
);arg
.put("x-dead-letter-routing-key","YD");arg
.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A
).withArguments(arg
).build();}@Beanpublic Queue queueB(){Map<String,Object> arg
=new HashMap<>();arg
.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE
);arg
.put("x-dead-letter-routing-key","YD");arg
.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B
).withArguments(arg
).build();}@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE_D
).build();}@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA
,@Qualifier("xExchange") DirectExchange xExchange
){return BindingBuilder.bind(queueA
).to(xExchange
).with("XA");}@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB
,@Qualifier("xExchange") DirectExchange xExchange
){return BindingBuilder.bind(queueB
).to(xExchange
).with("XB");}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD
,@Qualifier("yExchange") DirectExchange yExchange
){return BindingBuilder.bind(queueD
).to(yExchange
).with("YD");}
}
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {@Autowiredprivate RabbitTemplate rabbitTemplate
;@GetMapping("/{msg}")public void sendMsg(@PathVariable("msg") String msg
){log
.info("當前發送時間:{}發送了一條消息",new Date().toString());rabbitTemplate
.convertAndSend("X","XA","TTL消息延遲為10S,消息為===>"+msg
);rabbitTemplate
.convertAndSend("X","XB","TTL消息延遲為40S,消息為===>"+msg
);}
}
@Component
@Slf4j
public class DeadLetterConsumer {@RabbitListener(queues
= "QD")public void t1(Message message
, Channel channel
)throws Exception{log
.info("收到死信隊列的消息{},時間為{}",new String(message
.getBody(),"UTF-8"),new Date().toString());}
}
3.1.2 死信隊列實戰:隊列達到最大長度 設置正常隊列最大長度
生產者
public class Producer {public static final String NORMAL_EXCHANGE
="normal_exchange";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();for (int i
= 1; i
<= 10; i
++) {String msg
=""+i
;channel
.basicPublish(NORMAL_EXCHANGE
,"normal",null,msg
.getBytes());}}
}
消費者a
//設置當前正常隊列的長度限制超過長度,后面的消息會進入到死信隊列
map.put(“x-max-length”,6);
public class Consumer01 {public static final String NORMAL_EXCHANGE
="normal_exchange";public static final String DEAD_EXCHANGE
="dead_exchange";public static final String NORMAL_QUEUE
="normal_queue";public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare(NORMAL_EXCHANGE
,"direct");channel
.exchangeDeclare(DEAD_EXCHANGE
,"direct");HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange",DEAD_EXCHANGE
);map
.put("x-dead-letter-routing-key","dead");map
.put("x-max-length",6);channel
.queueDeclare(NORMAL_QUEUE
,false,false,false,map
);channel
.queueDeclare(DEAD_QUEUE
,false,false,false,null);channel
.queueBind(NORMAL_QUEUE
,NORMAL_EXCHANGE
,"normal");channel
.queueBind(DEAD_QUEUE
,DEAD_EXCHANGE
,"dead");DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(NORMAL_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
消費者b
public class Consumer02 {public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(DEAD_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
3.1.3 死信隊列實戰:消息被拒
生產者
public class Producer {public static final String NORMAL_EXCHANGE
="normal_exchange";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();for (int i
= 1; i
<= 10; i
++) {String msg
="info"+i
;channel
.basicPublish(NORMAL_EXCHANGE
,"normal",null,msg
.getBytes());}}
}
消費者a
- 此消息被拒接,是否重新放回正常隊列, false:不放回 則會放到死信隊列
- 1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
- 2.并且開啟手動應答
public class Consumer01 {public static final String NORMAL_EXCHANGE
="normal_exchange";public static final String DEAD_EXCHANGE
="dead_exchange";public static final String NORMAL_QUEUE
="normal_queue";public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("登錄賬戶");factory
.setPassword("登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare(NORMAL_EXCHANGE
,"direct");channel
.exchangeDeclare(DEAD_EXCHANGE
,"direct");HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange",DEAD_EXCHANGE
);map
.put("x-dead-letter-routing-key","dead");channel
.queueDeclare(NORMAL_QUEUE
,false,false,false,map
);channel
.queueDeclare(DEAD_QUEUE
,false,false,false,null);channel
.queueBind(NORMAL_QUEUE
,NORMAL_EXCHANGE
,"normal");channel
.queueBind(DEAD_QUEUE
,DEAD_EXCHANGE
,"dead");DeliverCallback deliverCallback
=( consumerTag
, message
)->{String msg
=new String(message
.getBody());if("info5".equals(msg
)){System.out
.println("Consumer1接收消息===>"+msg
+"此消息被拒絕");channel
.basicReject(message
.getEnvelope().getDeliveryTag(),false);}else {System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(NORMAL_QUEUE
,false,deliverCallback
,cancelCallback
);}
}
消費者b
public class Consumer02 {public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(DEAD_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
3.2 基于SpringBoot實現延遲隊列
配置隊列交換機
@Configuration
public class QueueConfig {@Bean("exchange")public DirectExchange exchange(){return new DirectExchange("msg");}@Bean("simpleQue")public Queue simpleQue(){HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange","dead");map
.put("x-dead-letter-routing-key","deadKey");map
.put("x-message-ttl",10000);return new Queue("simple",false,false,false,map
);}@Beanpublic Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple
,@Qualifier("exchange") DirectExchange msg
)throws Exception{return BindingBuilder.bind(simple
).to(msg
).with("info");}@Bean("deadExchange")public DirectExchange exchange1(){return new DirectExchange("dead");}@Bean("deadQueue")public Queue deadQ(){return new Queue("deadQue",false,false,false,null);}@Beanpublic Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue
,@Qualifier("deadExchange")DirectExchange dead
){return BindingBuilder.bind(queue
).to(dead
).with("deadKey");}
}
生產者
@RestController
public class Provider {@Autowiredprivate RabbitTemplate rabbitTemplate
;@RequestMapping("/ttl/{message}")public void t1(@PathVariable String message
){String queueName
="simple";Date date
= new Date();System.out
.println(date
);rabbitTemplate
.convertAndSend("msg","info",message
);}
}
消費者
@Component
public class Consumer {@RabbitListener(queues
= "deadQue")public void hello(Message msg
, Channel channel
)throws Exception{System.out
.println("接收到消息"+new String(msg
.getBody()));Date date1
= new Date();System.out
.println(date1
);}
}
3.3 發布確認 高級特性
3.3.1 可靠性投遞confirm模式
- 場景:在生產環境中由于一些不明原因,導致rabbitmq重啟,在rabbitmq重啟期間的生產者消息投遞失敗,導致消息丟失,需要手動處理和恢復。-可靠性投遞confirm模式
- 需要在application核心配置文件中設置發布確認類型
- spring-rabbitmq-publisher-confirm-type: correlated
- 類型1:none:禁用發布確認模式,是默認值
- 類型2:correlated:發布消息成功到交換機后出發回調方法
- 類型3:simple:和correlated效果一樣,但是如果回調返回的是false,會關閉信道,接下來無法發送消息
配置類
@Component
public class confirmConfig
{public static final String CONFIRM_EXCHANGE_NAME
="confirm.exchange";public static final String CONFIRM_QUEUE
="confirm.queue";public static final String CONFIRM_ROUTING_KEY
="confirm";@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME
);}@Bean("confirmQueue")public Queue confirmQueue(){return new Queue(CONFIRM_QUEUE
);}@Beanpublic Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange
,@Qualifier("confirmQueue")Queue confirmQueue
){return BindingBuilder.bind(confirmQueue
).to(confirmExchange
).with(CONFIRM_ROUTING_KEY
);}
}
- 當生產者發送給交換機消息時,交換機的名字錯了,或者交換機掛掉了,會導致消息的丟失,那么我們需要實現回調接口,當交換機收到消息后會給生產者發送回調消息
實現回調接口:實現 RabbitTemplate.ConfirmCallback接口的confirm方法并且將其注入到rabbit模板的內部類中
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate
;@PostConstruct public void init(){rabbitTemplate
.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData
, boolean b
, String s
) {String id
=correlationData
!=null?correlationData
.getId():"";if(b
){log
.info("交換機已經收到了ID為{}的消息",id
);}else {log
.info("交換機為收到了ID為{}的消息,原因是:{}",id
,s
);}}
}
生產者
@RestController
public class ConfirmProducer {@Autowiredprivate RabbitTemplate rabbitTemplate
;@GetMapping("/sendMsg/{msg}")public void t1(@PathVariable String msg
){CorrelationData correlationData
= new CorrelationData();correlationData
.setId("1");rabbitTemplate
.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME
,ConfirmConfig.CONFIRM_ROUTING_KEY
,"嘿嘿嘿".getBytes(),correlationData
);}
}
消費者
@Component
public class ConfirmConsumer {@RabbitListener(queues
= ConfirmConfig.CONFIRM_QUEUE
)public void consumer(Message message
){System.out
.println("高級特性確認發布消費者收到了消息===>"+new String(message
.getBody()));}
}
- 測試:當我們正常發送消息
- 測試:當我們把交換機名字換掉
3.3.2 可靠性投遞return模式
- 場景:若交換機收到消息,隊列沒有收到消息,應該如何解決?
- 需要在application核心配置文件中設置是否回退消息,當消息路由不到消費者
- spring-rabbitmq-publisher-returns=true 開啟回退消息
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate
;@PostConstruct public void init(){rabbitTemplate
.setConfirmCallback(this);rabbitTemplate
.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData
, boolean b
, String s
) {String id
=correlationData
!=null?correlationData
.getId():"";if(b
){log
.info("交換機已經收到了ID為{}的消息",id
);}else {log
.info("交換機未收到了ID為{}的消息,原因是:{}",id
,s
);}}@Overridepublic void returnedMessage(Message message
, int i
, String s
, String s1
, String s2
) {log
.info("消息{},被交換機{}退回,原因是{},路由是{}",new String(message
.getBody()),s1
,s
,s2
);}}
3.4 優先級隊列
- 優先級越高,消息先被消費者消費
- 官方設置最大優先級 0-255 超出優先級則報錯 自己使用時數字不必設置很大,會浪費CPU效率
生產者
public class PriorityProducer {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();AMQP.BasicProperties build
= new AMQP.BasicProperties.Builder().priority(10).build();for (int i
= 1; i
<= 10; i
++) {String msg
="info"+i
;if(i
==5){channel
.basicPublish("","hi",build
,msg
.getBytes());}else {channel
.basicPublish("","hi",null,msg
.getBytes());}}}
}
消費者
public class PriorityConsumer {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登錄用戶名");factory
.setPassword("RabbitMQ登錄密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();HashMap<String, Object> map
= new HashMap<>();map
.put("x-max-priority",10);channel
.queueDeclare("hi",false,false,false,map
);channel
.basicConsume("hi",true,(consumerTag
,message
)->{System.out
.println("優先級隊列接收消息順序===>"+new String(message
.getBody()));},(consumerTag
) -> System.out
.println("取消回調"));}
}
- 測試結果:我們定義的是消息5優先級最高,其他消息為默認優先級
3.5 消費端限流
- 參數一:prefetchSize:預先載入的大小 0表示不限制大小
- 參數二:prefetchCount:預先載入的消息條數
- 參數三:global:false
- 注意:autoAck手動應答一定要為false
channel
.basicQos(0,1,false);
12
public class AckProvider {public static final String QUEUE_NAME
="hello_Ack";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("用戶");factory
.setPassword("密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);Scanner scanner
= new Scanner(System.in
);while (true){String msg
= scanner
.nextLine();channel
.basicPublish("",QUEUE_NAME
, null,msg
.getBytes());System.out
.println("消息發送完畢");}}
}
public class AckConsumer2 {public static final String QUEUE_NAME
="hello_Ack";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("用戶");factory
.setPassword("密碼");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {System.out
.println(new String(message
.getBody()));try {Thread.sleep(1000*5);} catch (InterruptedException e
) {e
.printStackTrace();}channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消費消息被中斷");};channel
.basicQos(0,1,false);channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
總結
以上是生活随笔為你收集整理的RabbitMQ保姆级教程的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。