javascript
Spring Cloud Stream如何消费自己生产的消息
在上一篇《Spring Cloud Stream如何處理消息重復消費》中,我們通過消費組的配置解決了多實例部署情況下消息重復消費這一入門時的常見問題。本文將繼續說說在另外一個被經常問到的問題:如果微服務生產的消息自己也想要消費一份,應該如何實現呢?
- 常見錯誤
在放出標準答案前,先放出一個常見的錯誤姿勢和告警信息(以便您可以通過搜索引擎找到這里_)。以下錯誤基于Spring Boot 2.0.5、Spring Cloud Finchley SR1。
首先,根據入門示例,為了生產和消費消息,需要定義兩個通道:一個輸入、一個輸出。比如下面這樣:
public interface TestTopic {String OUTPUT = "example-topic";String INPUT = "example-topic";@Output(OUTPUT)MessageChannel output();@Input(INPUT)SubscribableChannel input();}通過INPUT和OUTPUT使用相同的名稱,讓生產消息和消費消息指向相同的Topic,從而實現消費自己發出的消息。
接下來,創建一個HTTP接口,并通過上面定義的輸出通道觸來生產消息,比如:
@Slf4j @RestController public class TestController {@Autowiredprivate TestTopic testTopic;@GetMapping("/sendMessage")public String messageWithMQ(@RequestParam String message) {testTopic.output().send(MessageBuilder.withPayload(message).build());return "ok";}}已經有生產消息的實現,下面來創建對輸入通道的監聽,以實現消息的消費邏輯。
@Slf4j @Component public class TestListener {@StreamListener(TestTopic.INPUT)public void receive(String payload) {log.info("Received: " + payload);throw new RuntimeException("BOOM!");}}最后,在應用主類中,使用@EnableBinding注解來開啟它,比如:
@EnableBinding(TestTopic.class) @SpringBootApplication public class TestApplication {public static void main(String[] args) {SpringApplication.run(TestApplication.class, args);}}看似天衣無縫的操作,然而在啟動的瞬間,你可能收到了下面這樣的錯誤:
org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=nullat org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:54) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:86) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_151]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:271) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:91) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at com.didispace.stream.TestApplication.main(TestApplication.java:13) [classes/:na]- 正確姿勢
根據錯誤提示:Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists,沒有啟動成功的原因是已經存在了一個名為example-topic的Bean,那么為什么會重復創建這個Bean呢?
實際上,在F版的Spring Cloud Stream中,當我們使用@Output和@Input注解來定義消息通道時,都會根據傳入的通道名稱來創建一個Bean。而在上面的例子中,我們定義的@Output和@Input名稱是相同的,因為我們系統輸入和輸出是同一個Topic,這樣才能實現對自己生產消息的消費。
既然這樣,我們定義相同的通道名是行不通了,那么我們只能通過定義不同的通道名,并為這兩個通道配置相同的目標Topic來將這一對輸入輸出指向同一個實際的Topic。對于上面的錯誤程序,只需要做如下兩處改動:
第一步:修改通道名,使用不同的名字
public interface TestTopic {String OUTPUT = "example-topic-output";String INPUT = "example-topic-input";@Output(OUTPUT)MessageChannel output();@Input(INPUT)SubscribableChannel input();}第二步:在配置文件中,為這兩個通道設置相同的Topic名稱,比如:
spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic這樣,這兩個輸入輸出通道就會都指向名為aaa-topic的Topic了。
最后,再啟動該程序,沒有報錯。然后訪問接口:localhost:8080/sendMessage?message=hello-didi,可以在控制臺中看到如下信息:
消費自己生產的消息成功了!讀者也還可以訪問一下應用的/actuator/beans端點,看看當前Spring上下文中有哪些Bean,應該可以看到有下面Bean,也就是上面分析的兩個通道的Bean對象
"example-topic-output": {"aliases": [],"scope": "singleton","type": "org.springframework.integration.channel.DirectChannel","resource": null,"dependencies": [] }, "example-topic-input": {"aliases": [],"scope": "singleton","type": "org.springframework.integration.channel.DirectChannel","resource": null,"dependencies": [] },- 后記
其實大部分開發者在使用Spring Cloud Stream時候碰到的問題都源于對Spring Cloud Stream的核心概念還是不夠理解。所以,還是推薦讀一下下面的文章和示例:
-
入門示例
-
核心概念
-
消費組
-
消費分區
-
代碼示例
本文示例讀者可以通過查看下面倉庫的中的stream-consumer-self項目:
- Github
- Gitee
本文由 程序猿DD-翟永超 創作,采用 CC BY 3.0 CN協議 進行許可。 可自由轉載、引用,但需署名作者且注明文章出處。如轉載至微信公眾號,請在文末添加作者公眾號二維碼。
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Spring Cloud Stream如何消费自己生产的消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Cloud构建微服务架构:
- 下一篇: Java编程思想——到底选择合成还是继承