javascript
ActiveMQ结合Spring收发消息
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!--定義主題(Topic)--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="Topic-zy"/> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,利用它發(fā)送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="defaultDestination" ref="topicDestination" /><property name="receiveTimeout" value="10000" /><!-- true是topic,false是queue,默認是false --><property name="pubSubDomain" value="true" /> </bean> <!-- 配置消息隊列監(jiān)聽者(Queue or Topic) --> <bean id="messageListener" class="com.service.TopicMessageListener" /> <!-- 顯示注入消息監(jiān)聽容器,配置連接工廠,監(jiān)聽的目標是QueueDestination,監(jiān)聽器是上面定義的監(jiān)聽器 --> <bean id="ListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="topicDestination" /><property name="messageListener" ref="messageListener" /> </bean> 復制代碼 配置 connectionFactory connectionFactory 是 Spring 用于創(chuàng)建到 JMS 服務器鏈接的,Spring 提供了多種 connectionFactory。 復制代碼<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
配置Queue 配置Topic 配置JMS消息模板——jmsTemplate 最后,在 applicationContext.xml 中引入配置好的 ActiveMQ.xml 以上就是配置文件相關的,下面是具體的業(yè)務代碼。 消息生產(chǎn)者服務 復制代碼@Service public class ProducerService { @Autowired private JmsTemplate jmsTemplate; //使用默認目的地 public void sendMessageDefault(final String msg){ Destination destination = jmsTemplate.getDefaultDestination(); System.out.println("向隊列: " + destination + " 成功發(fā)送一條消息"); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } //可指定目的地 public void sendMessage(Destination destination,final String msg){ jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } } 消息消費者服務
@Service public class ConsumerService { @Autowired private JmsTemplate jmsTemplate; //從指定的Destination接收消息 public TextMessage recive(Destination destination){ TextMessage message = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("從隊列" + destination.toString() + "收到了消息" + message.getText()); } catch (JMSException e) { e.printStackTrace(); } return message; } //從默認的Destination接收消息 public void reciveDefault(){ Destination destination = jmsTemplate.getDefaultDestination(); jmsTemplate.setReceiveTimeout(5000); while(true){ TextMessage message = (TextMessage) jmsTemplate.receive(destination); try { //這里還是同一個消費者 System.out.println("消費者 從目的地 " + destination.toString() + " 收到了消息" + message.getText()); } catch (JMSException e) { e.printStackTrace(); } } } } 生產(chǎn)者
直接在 main 方法中獲取 ApplicationContext 運行,便于測試。 復制代碼@Component public class MsgProducer { @Autowired private ProducerService producerService; public void send(){ System.out.println("生產(chǎn)者開始發(fā)送消息:"); for(int i = 1; i < 11; i++){ String msg = "生產(chǎn)者發(fā)出的消息"; producerService.sendMessageDefault(msg + "-----" + i); } } public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml"); MsgProducer msgProducer = context.getBean(MsgProducer.class); msgProducer.send(); } } 消費者
@Component public class MsgConsumer { @Autowired private ConsumerService consumerService; public void recive(){ System.out.println("消費者 1 開始接收消息:"); consumerService.reciveDefault(); } public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml"); MsgConsumer msgConsumer = context.getBean(MsgConsumer.class); msgConsumer.recive(); } } 接下來就可以啟動項目。同樣是使用兩種方式測試。
第一種方式————點對點(Queue)同步的方式先啟動生產(chǎn)者發(fā)送10條消息, 再啟動消費者,可以看到控制臺顯示成功收到10條消息。 復制代碼圖片描述(最多50字)
圖片描述(最多50字)
異步監(jiān)聽的方式通過監(jiān)聽器即可實現(xiàn)異步接收消息的效果,而不是像上面使用 while() 輪詢同步的方式。項目中一般都是使用異步監(jiān)聽的方式,在 A 服務中發(fā)送了一條消息,B 服務可以利用消息監(jiān)聽器監(jiān)聽,當收到消息后,進行相應的操作。消息監(jiān)聽器(3種)通過繼承 JMS 中的 MessageListener 接口,實現(xiàn) onMessage() 方法,就可以自定義監(jiān)聽器。這是最基本的監(jiān)聽器。(可根據(jù)業(yè)務實現(xiàn)自定義的功能)另外spring也給我們提供了其他類型的消息監(jiān)聽器,比如 SessionAwareMessageListener,它的作用不僅可以接收消息,還可以發(fā)送一條消息通知對方表示自己收到了消息。(還有一種是 MessageListenerAdapter)一個簡單的自定義監(jiān)聽器如下:收到消息后打印消息 復制代碼public class QueueMessageListener implements MessageListener { public void onMessage(Message message) { //如果有消息 TextMessage tmessage = (TextMessage) message; try { if(tmessage != null){ System.out.println("監(jiān)聽器監(jiān)聽消息:"+tmessage.getText()); } } catch (JMSException e) { e.printStackTrace(); } } } 在 ActiveMQ.xml 中引入消息監(jiān)聽器:
<bean id="queueMessageListener" class="com.service.QueueMessageListener" /> 復制代碼 <bean id="queueListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="QueueDestination" /><!--<property name="destination" ref="topicDestination" />--><property name="messageListener" ref="queueMessageListener" /> </bean> 可以看到,當使用消息監(jiān)聽器之后,每發(fā)送一條消息立馬就會被監(jiān)聽到: 復制代碼圖片描述(最多50字)
第二種方式————發(fā)布/訂閱(Topic)同步的方式類似點對點中同步的方式,只是每個消費者都能收到生產(chǎn)者發(fā)出的全部消息,不再贅述。異步監(jiān)聽的方式啟動兩個監(jiān)聽器(兩個消費者),對消息進行異步監(jiān)聽。看是否各自能收到生產(chǎn)者發(fā)送的消息。 復制代碼圖片描述(最多50字)
可以看到,每個監(jiān)聽器各自都收到了生產(chǎn)者發(fā)送的10條消息。復制代碼總結
以上是生活随笔為你收集整理的ActiveMQ结合Spring收发消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ansible-playbook 手工编
- 下一篇: Android 使用Adapter适配器