企业讯息
本文是我們名為“ Spring Integration for EAI ”的學院課程的一部分。
在本課程中,向您介紹了企業應用程序集成模式以及Spring Integration如何解決它們。 接下來,您將深入研究Spring Integration的基礎知識,例如通道,轉換器和適配器。 在這里查看 !
目錄
1.簡介 2.準備環境 3. JMS適配器:接收1.簡介
本教程重點介紹如何將應用程序與Spring Integration和JMS消息傳遞集成。 為此,我將首先向您展示如何安裝Active MQ,它將是本教程中的代理。 下一部分將顯示使用Spring Integration JMS通道適配器發送和接收JMS消息的示例。 在這些示例之后,我們將看到一些通過配置消息轉換和目標解析來自定義這些調用的方法。
本教程的最后一部分簡要介紹了如何將Spring Integration與AMQP協議一起使用。 它將完成RabbitMQ的安裝,最后給出一個基本的消息傳遞示例。
本教程由以下部分組成:
2.準備環境
如果要通過JMS發送消息,則首先需要一個代理。 本教程中包含的示例是通過Active MQ(一種開源消息傳遞代理)執行的。 在本節中,我將幫助您安裝服務器并實現一個簡單的Spring應用程序,以測試它是否已正確設置。 該說明基于Windows系統。 如果您已經安裝了服務器,則跳過此部分。
第一步是從Apache.org下載Apache MQ服務器。 下載完成后,只需將其解壓縮到您選擇的文件夾中即可。
要啟動服務器,你只需要執行其位于Apache的ActiveMQ的-5.9.0 \ bin文件夾中文件的ActiveMQ。
圖1
好的,服務器正在運行。 現在我們只需要實現該應用程序。 我們將創建一個生產者,一個使用者,一個spring配置文件和一個測試。
制片人
您可以使用任何Java類代替我的TicketOrder對象。
public class JmsProducer {@Autowired@Qualifier("jmsTemplate")private JmsTemplate jmsTemplate;public void convertAndSendMessage(TicketOrder order) {jmsTemplate.convertAndSend(order);}public void convertAndSendMessage(String destination, TicketOrder order) {jmsTemplate.convertAndSend(destination, order);} }消費者
public class SyncConsumer {@Autowiredprivate JmsTemplate jmsTemplate;public TicketOrder receive() {return (TicketOrder) jmsTemplate.receiveAndConvert("test.sync.queue");} }Spring配置文件
<bean id="consumer" class="xpadro.spring.integration.consumer.SyncConsumer"/> <bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616" /> </bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/> </bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="syncTestQueue"/> </bean><!-- Destinations --> <bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="test.sync.queue"/> </bean>考試
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate SyncConsumer consumer;@Testpublic void testReceiving() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage(order);Thread.sleep(2000);TicketOrder receivedOrder = consumer.receive();assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());} }如果測試通過,則說明所有設置正確。 現在,我們可以轉到下一部分。
3. JMS適配器:接收
Spring Integration提供了多個適配器和網關來接收來自JMS隊列或主題的消息。 下面簡要討論這些適配器:
- 入站通道適配器 :它在內部使用JmsTemplate主動從JMS隊列或主題接收消息。
- 消息驅動通道適配器 :內部使用Spring MessageListener容器被動接收消息。
入站通道適配器:活動接收
本節說明如何使用上一節中介紹的第一個適配器。
JMS入站通道適配器主動輪詢隊列以從中檢索消息。 由于它使用輪詢器,因此您必須在Spring配置文件中對其進行定義。 適配器檢索到消息后,它將通過指定的消息通道發送到消息傳遞系統中。 然后,我們可以使用端點(如轉換器,過濾器等)來處理消息,也可以將其發送給服務激活器。
本示例從JMS隊列檢索票單消息并將其發送到服務激活器,服務激活器將對其進行處理并確認訂單。 通過將訂單發送到某種存儲庫來確認該訂單,該存儲庫具有包含所有已注冊訂單的簡單列表。
我們使用與“ 2準備環境”部分中相同的生產者:
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure --> <!-- Connection factory and jmsTemplate configuration --> <!-- as seen in the second section --><!-- Destinations --> <bean id="toIntQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="int.sync.queue"/> </bean>測試將使用生產者將消息發送到“ toIntQueue”。 現在,我們將設置Spring Integration配置:
Integration-jms.xml
<context:component-scan base-package="xpadro.spring.integration"/><int-jms:inbound-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel"/><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/><int:poller id="poller" default="true" fixed-delay="1000"/>JMS入站通道適配器將使用定義的輪詢器從“ toIntQueue”中檢索消息。 您必須為適配器配置輪詢器,否則它將拋出運行時異常。 在這種情況下,我們定義了一個默認的輪詢器。 這意味著任何需要輪詢的端點都將使用此輪詢器。 如果未配置默認輪詢器,則需要為每個主動檢索消息的端點定義一個特定的輪詢器。
消費者
服務激活器只是一個bean(通過組件掃描自動檢測到):
@Component("ticketProcessor") public class TicketProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketProcessor.class);private static final String ERROR_INVALID_ID = "Order ID is invalid";@Autowiredprivate OrderRepository repository;public void processOrder(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}private boolean isInvalidOrder(TicketOrder order) {if (order.getFilmId() == -1) {return true;}return false;} }在前面的代碼片段中, processOrder方法接收一個TicketOrder對象并直接對其進行處理。 但是,您可以改為定義消息 <?>或Message <TicketOrder>以便接收消息。 這樣,您將可以訪問消息的有效負載及其標題。
還要注意,該方法返回void。 由于消息流在此處結束,因此我們不需要返回任何內容。 如果需要,您還可以定義服務適配器的回復通道并返回確認。 此外,例如,我們隨后將向該回復通道訂閱端點或網關,以便將確認發送到另一個JMS隊列,將其發送到Web服務或將其存儲到數據庫。
最后,讓我們看一下測試以了解如何執行所有測試:
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestIntegrationJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate OrderRepository repository;@Testpublic void testSendToIntegration() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage("int.sync.queue", order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());} }我已將Thread.sleep為四秒鐘,以等待消息發送。 我們本可以使用while循環來檢查是否已收到消息,直到達到超時為止。
入站通道適配器:無源接收
JMS接收部分的第二部分使用消息驅動的通道適配器。 這樣,消息一旦發送到隊列,便會立即傳遞到適配器,而無需使用輪詢器。 這是我們向其訂閱者傳遞消息的消息通道。
該示例與上一節中看到的示例非常相似。 我將僅顯示在配置中所做的更改。
我從上一個示例更改的唯一內容是spring集成配置:
<context:component-scan base-package="xpadro.spring.integration"/><int-jms:message-driven-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel" /><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/>我刪除了輪詢器,并更改了消息驅動通道適配器的JMS入站適配器。 而已; 適配器將被動地接收消息并將其傳遞到jmsChannel 。
請考慮到消息偵聽器適配器至少需要以下組合之一:
- 消息偵聽器容器。
- 連接工廠和目的地。
在我們的示例中,我們使用了第二個選項。 目標在適配器配置中指定,連接工廠在jms-config文件中定義,該文件也由測試導入。
4. JMS適配器:發送
在上一節中,我們已經了解了如何接收外部系統發送到JMS隊列的消息。 本節向您展示出站通道適配器,使您可以在系統之外發送JMS消息。
與入站適配器相比,出站適配器只有一種類型。 該適配器在內部使用JmsTemplate發送消息,并且為了配置此適配器,您將需要指定以下至少一項:
- 一個JmsTemplate。
- 連接工廠和目的地。
與入站示例一樣,我們使用第二個選項將消息發送到JMS隊列。 配置如下:
對于此示例,我們將為jms配置(jms-config.xml)創建一個新隊列。 這是我們的Spring Integration應用程序將消息發送到的位置:
<bean id="toJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.jms.queue"/> </bean>好的,現在我們使用JMS出站適配器配置集成配置:
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-channel-adapter id="jmsAdapter" channel="requestChannel" destination="toJmsQueue"/>我們正在使用網關作為郵件系統的入口。 測試將使用此接口發送新的TicketOrder對象。 網關將接收消息并將其放入requestChannel通道。 由于它是直接通道 ,它將被發送到JMS出站通道適配器。
適配器收到一個Spring Integration消息。 然后,它可以通過兩種方式發送消息:
- 將消息轉換為JMS消息。 這是通過將適配器的屬性“ extract-payload”設置為true(默認值)來完成的。 這是我們在示例中使用的選項。
- 按原樣發送消息,即Spring Integration消息。 您可以通過將“ extract-payload”屬性設置為false來完成此操作。
該決定取決于期望您的消息的系統類型。 如果另一個應用程序是Spring Integration應用程序,則可以使用第二種方法。 否則,請使用默認值。 在我們的示例中,另一端有一個簡單的Spring JMS應用程序。 因此,我們必須選擇第一個選項。
繼續我們的示例,現在我們看一下測試,該測試使用網關接口發送消息,并使用自定義使用者接收消息。 在此測試中,使用者將扮演一個JMS應用程序的角色,該應用程序使用jmsTemplate從JMS隊列中檢索它:
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-out-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestIntegrationJmsOutboundConfig {@Autowiredprivate SyncConsumer consumer;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);TicketOrder receivedOrder = consumer.receive("to.jms.queue");assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());} }5.使用網關
除了通道適配器之外,Spring Integration還提供了入站和出站網關。 您可能還記得以前的教程,網關提供了與外部系統的雙向通信,這意味著發送和接收或接收和回復操作。 在這種情況下,它允許請求或重試操作。
在本節中,我們將看到一個使用JMS出站網關的示例。 網關將向隊列發送JMS消息,并等待答復。 如果未發送回任何答復,則網關將拋出MessageTimeoutException 。
Spring Integration配置
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway id="inGateway" default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel"/><int:channel id="jmsReplyChannel"/><int:service-activator method="registerOrderConfirmation" input-channel="jmsReplyChannel" ref="ticketProcessor"/>流程如下:
訂單處理器非常簡單。 它收到TicketConfirmation并將其添加到票證存儲庫:
@Component("ticketProcessor") public class TicketProcessor {@Autowiredprivate OrderRepository repository;public void registerOrderConfirmation(TicketConfirmation confirmation) {repository.confirmOrder(confirmation);} }考試
@RunWith(SpringJUnit4ClassRunner.class) public class TestIntegrationJmsOutGatewayConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("321", conf.getId());} }外部系統
為了完全理解該示例,我將向您展示將消息傳遞到JMS隊列時發生的情況。
偵聽Spring Integration發送消息的隊列,有一個偵聽器asyncConsumer :
<bean id="toAsyncJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.async.jms.queue"/> </bean><!-- Listeners --> <jms:listener-container connection-factory="connectionFactory"><jms:listener destination="to.async.jms.queue" ref="asyncConsumer"/> </jms:listener-container>偵聽器接收到該消息,并使用票證確認創建新消息并進行回復。 注意,我們必須將回復消息的相關性ID設置為與請求消息相同的值。 這將使客戶知道我們正在響應哪個消息。 另外,我們將目標設置為請求消息中配置的回復通道。
@Component("asyncConsumer") public class AsyncConsumer implements MessageListener {@Autowiredprivate JmsTemplate template;@Overridepublic void onMessage(Message order) {final Message msgOrder = order;TicketOrder orderObject;try {orderObject = (TicketOrder) ((ObjectMessage) order).getObject();} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}float amount = 5.95f * orderObject.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("321", orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount);try {template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setJMSCorrelationID(msgOrder.getJMSCorrelationID());return message;}});} catch (JmsException | JMSException e) {throw JmsUtils.convertJmsAccessException((JMSException) e);}} }6.消息轉換
消息通道適配器和網關都使用消息轉換器將傳入消息轉換為Java類型,或者采用相反的方式。 轉換器必須實現MessageConverter接口:
public interface MessageConverter {<P> Message<P> toMessage(Object object);<P> Object fromMessage(Message<P> message);}Spring Integration帶有MessageConverter接口的兩種實現:
MapMessageConverter
它的fromMessage方法使用兩個鍵創建一個新的HashMap:
- 有效負載:值為消息的有效負載( message.getPayload )。
- 標頭:該值是另一個HashMap,具有來自原始消息的所有標頭。
“ toMessage”方法期望一個具有相同結構(有效負載和標頭鍵)的Map實例,并構造一個Spring Integration消息。
SimpleMessageConverter
這是適配器和網關使用的默認轉換器。 您可以從源代碼中看到它與對象之間的轉換:
public Message<?> toMessage(Object object) throws Exception {if (object == null) {return null;}if (object instanceof Message<?>) {return (Message<?>) object;}return MessageBuilder.withPayload(object).build(); }public Object fromMessage(Message<?> message) throws Exception {return (message != null) ? message.getPayload() : null; }無論如何,如果需要自己的實現,則可以在通道適配器或網關配置中指定自定義轉換器。 例如,使用網關:
<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel" message-converter="myConverter"/>只要記住您的轉換器應該實現MessageConverter:
@Component("myConverter") public class MyConverter implements MessageConverter {7. JMS支持的消息通道
通道適配器和網關用于與外部系統進行通信。 JMS支持的消息通道用于在同一應用程序內的使用者和生產者之間發送和接收JMS消息。 盡管在這種情況下我們仍然可以使用通道適配器,但是使用JMS通道要簡單得多。 與集成消息通道的區別在于,JMS通道將使用JMS代理發送消息。 這意味著消息將不僅僅存儲在內存通道中。 相反,它將被發送到JMS提供程序,從而也可以使用事務。 如果使用事務,它將按以下方式工作:
- 如果回滾事務,則將消息發送到JMS支持的通道的生產者將不會編寫該消息。
- 如果事務回滾,訂閱JMS支持的通道的使用者將不會從該通道中刪除消息。
對于此功能,Spring Integration提供了兩個渠道:點對點和發布/訂閱渠道。 它們配置如下:
點對點直接渠道
<int-jms:channel id="jmsChannel" queue="myQueue"/>發布/訂閱頻道
<int-jms:publish-subscribe-channel id="jmsChannel" topic="myTopic"/>在下面的示例中,我們可以看到一個簡單的應用程序,其中有兩個端點使用JMS支持的通道相互通信。
組態
發送到消息傳遞系統( TicketOrder對象)的消息到達服務激活器(票證處理器)。 然后,該處理器將訂單( sendJMS )發送到JMS支持的消息。 訂閱此通道,有一個相同的處理器將接收消息( receiveJms ),對其進行處理以創建TicketConfirmation并將其注冊到票證存儲庫:
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int:service-activator method="sendJms" input-channel="requestChannel" output-channel="jmsChannel" ref="ticketJmsProcessor"/><int-jms:channel id="jmsChannel" queue="syncTestQueue"/><int:service-activator method="receiveJms" input-channel="jmsChannel" ref="ticketJmsProcessor"/>處理器
實現兩種方法: sendJms和receiveJms :
@Component("ticketJmsProcessor") public class TicketJmsProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor.class);@Autowiredprivate OrderRepository repository;public TicketOrder sendJms(TicketOrder order) {logger.info("Sending order {}", order.getFilmId());return order;}public void receiveJms(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);} }考試
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-jms-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestIntegrationJmsToJmsConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());} }JMS支持的通道提供了不同的可能性,例如配置隊列名稱而不是隊列引用或使用目標解析器:
<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>8.動態目標解析
目標解析器是一個類,它允許我們將目標名稱解析為JMS目標。 任何目標解析器都必須實現以下接口:
public interface DestinationResolver {Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)throws JMSException; }可以在JMS通道適配器,JMS網關和JMS支持的通道上指定目標解析器。 如果您未明確配置目標解析器,Spring將使用默認實現,即DynamicDestinationResolver 。 下面將解釋該解析器作為Spring提供的其他實現:
- DynamicDestinationResolver :通過使用標準JMS Session.createTopic和Session.createQueue方法將目標名稱解析為動態目標。
- BeanFactoryDe??stinationResolver :它將在Spring上下文中查找名稱類似于提供的目標名稱的bean,并期望其類型為javax.jms.Destination 。 如果找不到,它將拋出DestinationResolutionException 。
- JndiDestinationResolver :它將假定目標名稱是JNDI位置。
如果我們不想使用默認的動態解析器??,則可以實現自定義解析器,并在所需的端點中對其進行配置。 例如,以下JMS支持的通道使用不同的實現:
<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>9. AMQP集成
安裝
要安裝并啟動RabbitMQ服務器,您只需要遵循以下步驟即可。 如果您已經安裝了服務器,則跳過此部分。
- http://www.erlang.org/download.html
- 下一步是下載并安裝RabbitMQ。 如果要使用與本教程相同的版本,請下載版本3.2.4。
- http://www.rabbitmq.com/
- 現在,打開命令提示符。 如果您是Windows用戶,則可以通過單擊開始菜單并在RabbitMQ文件夾中選擇RabbitMQ命令提示符直接進入。
- 激活管理插件
- > rabbitmq-plugins enable rabbitmq_management
- 啟動服務器
- > rabbitmq-server.bat
好的,現在我們將測試RabbitMQ是否已正確安裝。 轉到http:// localhost:15672并使用“ guest”作為用戶名和密碼登錄。 如果使用的是3.0之前的版本,則端口為55672。
如果您看到網絡用戶界面,則一切就緒。
演示應用
為了將AMQP與Spring Integration結合使用,我們需要在pom.xml文件中添加以下依賴項:
SpringAMQP(適用于RabbitMQ)
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.3.1.RELEASE</version> </dependency>Spring Integration AMQP端點
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-amqp</artifactId><version>3.0.2.RELEASE</version> </dependency>現在,我們將創建一個新的配置文件amqp-config.xml,其中將包含RabbitMQ配置(例如我們在本教程先前使用的JMS的jms-config)。
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory" /><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /><rabbit:admin connection-factory="connectionFactory" /><rabbit:queue name="rabbit.queue" /><rabbit:direct-exchange name="rabbit.exchange"><rabbit:bindings><rabbit:binding queue="rabbit.queue" key="rabbit.key.binding" /></rabbit:bindings></rabbit:direct-exchange> </beans>下一個文件是Spring Integration文件,其中包含通道和通道適配器:
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:int="http://www.springframework.org/schema/integration"xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"><context:component-scan base-package="xpadro.spring.integration.amqp"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.amqp.service.AMQPService"/><int:channel id="requestChannel"/><int-amqp:outbound-channel-adapterchannel="requestChannel" amqp-template="amqpTemplate" exchange-name="rabbit.exchange"routing-key="rabbit.key.binding"/><int-amqp:inbound-channel-adapter channel="responseChannel"queue-names="rabbit.queue" connection-factory="connectionFactory" /><int:channel id="responseChannel"/><int:service-activator ref="amqpProcessor" method="process" input-channel="responseChannel"/></beans>流程如下:
- 測試應用程序向網關發送一條消息,該消息將是一個簡單的String。
- 從網關,它將通過“ requestChannel”通道到達出站通道適配器。
- 出站通道適配器將消息發送到“ rabbit.queue”隊列。
- 訂閱此“ rabbit.queue”隊列,我們??已經配置了入站通道適配器。 它將接收發送到隊列的消息。
- 該消息通過“ responseChannel”通道發送到服務激活器。
- 服務激活器僅打印消息。
用作消息傳遞系統入口的網關包含一個方法:
public interface AMQPService {@Gatewaypublic void sendMessage(String message); }服務激活器amqpProcessor非常簡單。 它收到一條消息并打印其有效負載:
@Component("amqpProcessor") public class AmqpProcessor {public void process(Message<String> msg) {System.out.println("Message received: "+msg.getPayload());} }為了完成該示例,以下是通過調用網關包裝的服務來啟動流的應用程序:
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/amqp-config.xml","/xpadro/spring/integration/test/int-amqp-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestIntegrationAMQPConfig {@Autowiredprivate AMQPService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {String msg = "hello";service.sendMessage(msg);Thread.sleep(2000);} }翻譯自: https://www.javacodegeeks.com/2015/09/enterprise-messaging.html
總結
- 上一篇: activemq端口好_ActiveMQ
- 下一篇: 基于linux的软件开发(linux开发