javascript
Spring Cloud Stream如何处理消息重复消费
最近收到好幾個類似的問題:使用Spring Cloud Stream操作RabbitMQ或Kafka的時候,出現消息重復消費的問題。通過溝通與排查下來主要還是用戶對消費組的認識不夠。其實,在之前的博文以及《Spring Cloud微服務實戰》一書中都有提到關于消費組的概念以及作用。
那么什么是消費組呢?為什么要用消費組?它解決什么問題呢?摘錄一段之前博文的內容,來解答這些疑問:
通常在生產環境,我們的每個服務都不會以單節點的方式運行在生產環境,當同一個服務啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認情況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每個消費者實例接收和處理(出現上述重復消費問題)。但是有些業務場景之下,我們希望生產者產生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設置消費組來實現這樣的功能。
詳細也可查看原文:消息驅動的微服務(消費組)。
下面,通過一個例子來看看如何使用消費組:
問題重現
構建消息消費端
第一步:創建綁定接口,綁定example-topic輸入通道(默認情況下,會綁定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。
| interface ExampleBinder { String NAME = "example-topic"; (NAME) SubscribableChannel input(); } |
第二步:對上述輸入通道創建監聽與處理邏輯。
| (ExampleBinder.class) public class ExampleReceiver { private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class); (ExampleBinder.NAME) public void receive(String payload) { logger.info("Received: " + payload); } } |
第三步;創建應用主類和配置文件
public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } } |
| spring.application.name=stream-consumer-group server.port=0 |
這里設置server.port=0,以方便在本地啟動多實例來重現問題。
完成上述操作之后,啟動兩個該應用的實例,以備后續調用。
構建消息生產端
比較簡單,需要注意的是,使用@Output創建一個同名的輸出綁定,這樣發出的消息才能被上述啟動的實例接收到。具體實現如下:
| (SpringRunner.class) (value = {ExampleApplicationTests.ExampleBinder.class}) public class ExampleApplicationTests { private ExampleBinder exampleBinder; public void exampleBinderTester() { exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build()); } public interface ExampleBinder { String NAME = "example-topic"; (NAME) MessageChannel output(); } } |
啟動上述測試用例之后,可以發現之前啟動的兩個實例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com。消息重復消費的問題成功重現!
使用消費組解決問題
如何解決上述消息重復消費的問題呢?我們只需要在配置文件中增加如下配置即可:
| spring.cloud.stream.bindings.example-topic.group=aaa |
當我們指定了某個綁定所指向的消費組之后,往當前主題發送的消息在每個訂閱消費組中,只會有一個訂閱者接收和消費,從而實現了對消息的負載均衡。只所以之前會出現重復消費的問題,是由于默認情況下,任何訂閱都會產生一個匿名消費組,所以每個訂閱實例都會有自己的消費組,從而當有消息發送的時候,就形成了廣播的模式。
另外,需要注意上述配置中example-topic是在代碼中@Output和@Input中傳入的名字。
代碼示例
本文示例讀者可以通過查看下面倉庫的中的stream-consumer-group項目:
- Github
- Gitee
如果您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!
以下專題教程也許您會有興趣
- Spring Boot基礎教程
- Spring Cloud基礎教程
總結
以上是生活随笔為你收集整理的Spring Cloud Stream如何处理消息重复消费的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 磁盘I/O那些事
- 下一篇: 线性代数应该这样讲(二)