使用Spring Integration Java DSL与Rabbit MQ集成
我最近參加了在拉斯維加斯舉行的2016年Spring會議 ,很幸運地看到了我在軟件世界中長期敬佩的一些人。 我親自遇到了其中的兩個人,他們實際上合并了幾年前我與Spring Integration相關(guān)的一些小貢獻(xiàn)– Gary Russel和Artem Bilan ,他們啟發(fā)了我重新審視我已經(jīng)有一段時間沒有使用過的Spring Integration 。
我再一次想起了Spring Integration如何使任何復(fù)雜的Enterprise集成方案看起來都很簡單。 我很高興看到基于Spring Integration Java的DSL現(xiàn)在已完全集成到Spring Integration傘和諸如Spring Cloud Stream之類的更高層次的抽象中(感謝我的好朋友和對該項目的貢獻(xiàn)者進(jìn)行了介紹)
Soby Chacko ),這使得某些消息驅(qū)動的場景更加容易。
在本文中,我只是在回顧與RabbitMQ的一個非常簡單的集成方案,在以后的文章中,將使用Spring Cloud Stream重新實現(xiàn)它。
考慮一個場景,其中兩個服務(wù)通過它們之間的RabbitMQ代理彼此通信,其中一個生成某種工作,另一個處理該工作。
制片人
可以使用Spring Integration Java DSL以以下方式在代碼中表示工作單元產(chǎn)生/分發(fā)部分:
@Configuration public class WorksOutbound {@Autowiredprivate RabbitConfig rabbitConfig;@Beanpublic IntegrationFlow toOutboundQueueFlow() {return IntegrationFlows.from("worksChannel").transform(Transformers.toJson()).handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate())).get();} }這是非常容易理解的-流程首先從名為“ worksChannel”的通道讀取消息,將消息轉(zhuǎn)換為json,然后使用出站通道適配器將其分派到RabbitMQ交換。 現(xiàn)在,消息如何到達(dá)名為“ worksChannel”的通道-我已經(jīng)通過Messaging網(wǎng)關(guān)(Spring集成世界的入口)配置了消息-
@MessagingGateway public interface WorkUnitGateway {@Gateway(requestChannel = "worksChannel")void generate(WorkUnit workUnit);}因此,現(xiàn)在,如果Java客戶端想要向Rabbitmq派遣“工作單元”,則調(diào)用將如下所示:
WorkUnit sampleWorkUnit = new WorkUnit(UUID.randomUUID().toString(), definition); workUnitGateway.generate(sampleWorkUnit);我在這里刷了幾件事-特別是Rabbit Mill配置,它是在磨房中運行的,可在此處獲得
消費者
沿著生產(chǎn)者的思路,消費者流程將從接收來自RabbitMQ隊列的消息開始,將其轉(zhuǎn)換為域模型,然后處理該消息,使用Spring Integration Java DSL通過以下方式表示:
@Configuration public class WorkInbound {@Autowiredprivate RabbitConfig rabbitConfig;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, rabbitConfig.worksQueue()).concurrentConsumers(3)).transform(Transformers.fromJson(WorkUnit.class)).handle("workHandler", "process").get();} }代碼應(yīng)該很直觀,上面的workHandler是一個簡單的Java pojo,看起來像這樣,完成了僅記錄有效負(fù)載的非常重要的工作:
@Service public class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());} } 本質(zhì)上就是這樣,如果使用直接Java和原始RabbitMQ庫嘗試使用Spring Integration,它將提供相當(dāng)出色的代碼外觀。
Spring Cloud Stream使整個設(shè)置變得更加簡單,將成為未來帖子的主題。
如果您有興趣嘗試一下,我已將整個代碼發(fā)布在我的github倉庫中 。
翻譯自: https://www.javacodegeeks.com/2016/08/integrating-rabbit-mq-using-spring-integration-java-dsl.html
總結(jié)
以上是生活随笔為你收集整理的使用Spring Integration Java DSL与Rabbit MQ集成的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在路由器里设置路由器如何在路由器里
- 下一篇: 简单操作让台式机连接无线网台式电脑如何连