javascript
【SpringBoot MQ 系列】RabbitListener 消费基本使用姿势介绍
【MQ 系列】RabbitListener 消費基本使用姿勢介紹
之前介紹了 rabbitmq 的消息發送姿勢,既然有發送,當然就得有消費者,在 SpringBoot 環境下,消費可以說比較簡單了,借助@RabbitListener注解,基本上可以滿足你 90%以上的業務開發需求
下面我們來看一下@RabbitListener的最最常用使用姿勢
I. 配置
首先創建一個 SpringBoot 項目,用于后續的演示
- springboot 版本為2.2.1.RELEASE
- rabbitmq 版本為?3.7.5?(安裝教程可參考:?【MQ 系列】springboot + rabbitmq 初體驗)
依賴配置文件 pom.xml
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version> </properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 注意,下面這個不是必要的哦--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies><build><pluginManagement><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></pluginManagement> </build> <repositories><repository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/libs-snapshot-local</url><snapshots><enabled>true</enabled></snapshots></repository><repository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/libs-milestone-local</url><snapshots><enabled>false</enabled></snapshots></repository><repository><id>spring-releases</id><name>Spring Releases</name><url>https://repo.spring.io/libs-release-local</url><snapshots><enabled>false</enabled></snapshots></repository> </repositories>在application.yml配置文件中,添加 rabbitmq 的相關屬性
spring:rabbitmq:virtual-host: /username: adminpassword: adminport: 5672host: 127.0.0.1II. 消費姿勢
本文將目標放在實用性上,將結合具體的場景來演示@RabbitListener的使用姿勢,因此當你發現看完本文之后這個注解里面有些屬性還是不懂,請不要著急,下一篇會一一道來
0. mock 數據
消費消費,沒有數據,怎么消費呢?所以我們第一步,先創建一個消息生產者,可以往 exchange 寫數據,供后續的消費者測試使用
本篇的消費主要以 topic 模式來進行說明(其他的幾個模式使用差別不大,如果有需求的話,后續補齊)
@RestController public class PublishRest {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping(path = "publish")public boolean publish(String exchange, String routing, String data) {rabbitTemplate.convertAndSend(exchange, routing, data);return true;} }提供一個簡單 rest 接口,可以指定往哪個 exchange 推送數據,并制定路由鍵
1. case1: exchange, queue 已存在
對于消費者而言其實是不需要管理 exchange 的創建/銷毀的,它是由發送者定義的;一般來講,消費者更關注的是自己的 queue,包括定義 queue 并與 exchange 綁定,而這一套過程是可以直接通過 rabbitmq 的控制臺操作的哦
所以實際開發過程中,exchange 和 queue 以及對應的綁定關系已經存在的可能性是很高的,并不需要再代碼中額外處理;
在這種場景下,消費數據,可以說非常非常簡單了,如下:
/*** 當隊列已經存在時,直接指定隊列名的方式消費** @param data*/ @RabbitListener(queues = "topic.a") public void consumerExistsQueue(String data) {System.out.println("consumerExistsQueue: " + data); }直接指定注解中的queues參數即可,參數值為對列名(queueName)
2. case2: queue 不存在
當 queue 的 autoDelete 屬性為 false 時,上面的使用場景還是比較合適了;但是,當這個屬性為 true 時,沒有消費者隊列就會自動刪除了,這個時候再用上面的姿勢,可能會得到下面的異常
通常這種場景下,是需要我們來主動創建 Queue,并建立與 Exchange 的綁定關系,下面給出@RabbitListener的推薦使用姿勢
/*** 隊列不存在時,需要創建一個隊列,并且與exchange綁定*/ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC),key = "r")) public void consumerNoQueue(String data) {System.out.println("consumerNoQueue: " + data); }一個注解,內部聲明了隊列,并建立綁定關系,就是這么神奇!!!
注意@QueueBinding注解的三個屬性:
- value: @Queue 注解,用于聲明隊列,value 為 queueName, durable 表示隊列是否持久化, autoDelete 表示沒有消費者之后隊列是否自動刪除
- exchange: @Exchange 注解,用于聲明 exchange, type 指定消息投遞策略,我們這里用的 topic 方式
- key: 在 topic 方式下,這個就是我們熟知的 routingKey
以上,就是在隊列不存在時的使用姿勢,看起來也不復雜
3. case3: ack
在前面 rabbitmq 的核心知識點學習過程中,會知道為了保證數據的一致性,有一個消息確認機制;
我們這里的 ack 主要是針對消費端而言,當我們希望更改默認 ack 方式(noack, auto, manual),可以如下處理
/*** 需要手動ack,但是不ack時** @param data*/ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"),exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL") public void consumerNoAck(String data) {// 要求手動ack,這里不ack,會怎樣?System.out.println("consumerNoAck: " + data); }上面的實現也比較簡單,設置ackMode=MANUAL,手動 ack
但是,請注意我們的實現中,沒有任何一個地方體現了手動 ack,這就相當于一致都沒有 ack,在后面的測試中,可以看出這種不 ack 時,會發現數據一直在unacked這一欄,當 Unacked 數量超過限制的時候,就不會再消費新的數據了
4. case4: manual ack
上面雖然選擇 ack 方式,但是還缺一步 ack 的邏輯,接下來我們看一下如何補齊
/*** 手動ack** @param data* @param deliveryTag* @param channel* @throws IOException*/ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL") public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {System.out.println("consumerDoAck: " + data);if (data.contains("success")) {// RabbitMQ的ack機制中,第二個參數返回true,表示需要將這條消息投遞給其他的消費者重新消費channel.basicAck(deliveryTag, false);} else {// 第三個參數true,表示這個消息會重新進入隊列channel.basicNack(deliveryTag, false, true);} }請注意,方法多了兩個參數
- deliveryTag: 相當于消息的唯一標識,用于 mq 辨別是哪個消息被 ack/nak 了
- channel: mq 和 consumer 之間的管道,通過它來 ack/nak
當我們正確消費時,通過調用?basicAck?方法即可
// RabbitMQ的ack機制中,第二個參數返回true,表示需要將這條消息投遞給其他的消費者重新消費 channel.basicAck(deliveryTag, false);當我們消費失敗,需要將消息重新塞入隊列,等待重新消費時,可以使用?basicNack
// 第三個參數true,表示這個消息會重新進入隊列 channel.basicNack(deliveryTag, false, true);5. case5: 并發消費
當消息很多,一個消費者吭哧吭哧的消費太慢,但是我的機器性能又杠杠的,這個時候我就希望并行消費,相當于同時有多個消費者來處理數據
要支持并行消費,如下設置即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"),exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4") public void multiConsumer(String data) {System.out.println("multiConsumer: " + data); }請注意注解中的concurrency = "4"屬性,表示固定 4 個消費者;
除了上面這種賦值方式之外,還有一種?m-n?的格式,表示 m 個并行消費者,最多可以有 n 個
(額外說明:這個參數的解釋實在SimpleMessageListenerContainer的場景下的,下一篇文章會介紹它與DirectMessageListenerContainer的區別)
6. 測試
通過前面預留的消息發送接口,我們在瀏覽器中請求:?http://localhost:8080/publish?exchange=topic.e&routing=r&data=wahaha
然后看一下輸出,五個消費者都接收到了,特別是主動 nak 的那個消費者,一直在接收到消息;
(因為一直打印日志,所以重啟一下應用,開始下一個測試)
然后再發送一條成功的消息,驗證下手動真確 ack,是否還會出現上面的情況,請求命令:?http://localhost:8080/publish?exchange=topic.e&routing=r&data=successMsg
然后再關注一下,沒有 ack 的那個隊列,一直有一個 unack 的消息
II. 其他
系列博文
- 【MQ 系列】springboot + rabbitmq 初體驗
- 【MQ 系列】RabbitMq 核心知識點小結
- 【MQ 系列】SprigBoot + RabbitMq 發送消息基本使用姿勢
- 【MQ 系列】RabbitMq 消息確認/事務機制的使用姿勢
項目源碼
- 工程:GitHub - liuyueyi/spring-boot-demo: Spring Boot & Spring Cloud & Spring Security Demo Case(Spring學習示例實戰項目)
- 源碼:spring-boot-demo/spring-boot/302-rabbitmq-consumer at master · liuyueyi/spring-boot-demo · GitHub
1.?一灰灰 Blog:?一灰灰Blog
一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛
2. 聲明
盡信書則不如,以上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現 bug 或者有更好的建議,歡迎批評指正,不吝感激
- 微博地址:?小灰灰 Blog
- QQ: 一灰灰/3302797840
3. 掃描關注
一灰灰 blog
總結
以上是生活随笔為你收集整理的【SpringBoot MQ 系列】RabbitListener 消费基本使用姿势介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 姓苏的男孩取什么名字好
- 下一篇: 猕猴桃为什么叫猕猴桃?