Java消息服务~@JmsListener集成
生活随笔
收集整理的這篇文章主要介紹了
Java消息服务~@JmsListener集成
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、配置ActiveMQ連接工廠、JmsTemplate等
注意:需要開啟@EnableJms。
注解@EnableJms自動掃描帶有@JmsListener的Bean方法,并為其創建一個MessageListener把它包裝起來
import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate;/*** ActiveMQ配置.<br>* @author gqltt<br>*/ @Configuration @EnableJms public class ActiveMQConfig {/*** 隊列JmsTemplate的名稱.*/public static final String JMS_TEMPLATE_QUEUE = "innerJmsQueueTemplate";/*** 訂閱JmsTemplate的名稱.*/public static final String JMS_TEMPLATE_TOPIC = "innerJmsTopicTemplate";/*** 隊列JmsListenerContainerFactory的名稱.*/public static final String JMS_CONTAINER_FACTORY_QUEUE = "innerJmsQueueListenerContainerFactory";/*** 訂閱JmsListenerContainerFactory的名稱.*/public static final String JMS_CONTAINER_FACTORY_TOPIC = "innerJmsTopicListenerContainerFactory";/*** 獲取brokerURL.*/public static String getBrokerURL() {return PropertyConfigUtil.getProperty("spring.activemq.brokerURL");}/*** 獲取userName.* @return*/public static String getUserName() {return PropertyConfigUtil.getProperty("spring.activemq.userName");}/*** 獲取password.* @return*/public static String getPassword() {return PropertyConfigUtil.getProperty("spring.activemq.password");}/*** ActiveMQConnectionFactory.* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = "innerActiveMQConnectionFactory")public ActiveMQConnectionFactory getActiveMQConnectionFactory() {final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setBrokerURL(getBrokerURL());connectionFactory.setUserName(getUserName());connectionFactory.setPassword(getPassword());return connectionFactory;}/*** CachingConnectionFactory.* @param targetConnectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = "innerCachingConnectionFactory")public CachingConnectionFactory getCachingConnectionFactory(@Qualifier("abcInnerActiveMQConnectionFactory") ActiveMQConnectionFactory targetConnectionFactory) {final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setTargetConnectionFactory(targetConnectionFactory);cachingConnectionFactory.setSessionCacheSize(20);return cachingConnectionFactory;}/*** 隊列JmsTemplate.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_TEMPLATE_QUEUE)public JmsTemplate getJmsQueueTemplate(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {final JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setConnectionFactory(connectionFactory);jmsTemplate.setPubSubDomain(false);jmsTemplate.setReceiveTimeout(30_000);return jmsTemplate;}/*** 訂閱JmsTemplate.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_TEMPLATE_TOPIC)public JmsTemplate getJmsTopicTemplate(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {final JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setConnectionFactory(connectionFactory);jmsTemplate.setPubSubDomain(true);jmsTemplate.setReceiveTimeout(30_000);return jmsTemplate;}/*** 隊列模式JmsListenerContainerFactory.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_CONTAINER_FACTORY_QUEUE)public JmsListenerContainerFactory<?> getJmsQueueListenerContainerFactory(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setReceiveTimeout(30_000L);return factory;}/*** 訂閱模式JmsListenerContainerFactory.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_CONTAINER_FACTORY_TOPIC)public JmsListenerContainerFactory<?> getJmsTopicListenerContainerFactory(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//設置為發布訂閱方式, 默認情況下使用的生產消費者方式factory.setPubSubDomain(true);factory.setReceiveTimeout(30_000L);return factory;}}注意:使用條件判斷是否初始化ActiveMQ相關對象
import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.core.type.AnnotatedTypeMetadata;/*** 是否初始化ActiveQM的判斷.<br>* @author gqltt<br>*/ public class ActiveMQCondition implements Condition {/*** {@inheritDoc}*/@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {final String brokerUrl = ActiveMQConfig.getBrokerURL();return StringUtil.isNotEmpty(brokerUrl);}}2、發送消息
/*** 發送保存成功【確認】信息.* @param gid*/private void sendPay(long gid) {try {final String msg = CastUtil.getString(gid);final JmsTemplate jmsTemplate = JmsTemplateUtil.getJmsQueueTemplate();jmsTemplate.convertAndSend(OrderListener.ORDER_PAYED_QUEUE, msg);} catch (Exception e) {if (LOG.isErrorEnabled()) {LOG.error("發送消息:" + OrderListener.ORDER_PAYED_QUEUE + "失敗。", e);}}}3、@JmsListener配置消息監聽
import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;@Component public class OrderListener {/*** 訂單支付隊列.*/public static final String ORDER_PAYED_QUEUE = "order_payed_queue";@Autowiredprivate IOrderService orderService;/*** 訂單已支付,通知改訂單狀態.* @param msg*/@JmsListener(containerFactory = ActiveMQConfig.JMS_CONTAINER_FACTORY_QUEUE, destination = ORDER_PAYED_QUEUE)public void payed(final String msg) {if (StringUtil.isNullOrTrimEmptyString(msg)) {return;}final long gid = CastUtil.getLong(msg);orderService.updatePayed(gid);} }@JmsListener 接收元素的Message對象
@Component public class MailMessageListener {final Logger logger = LoggerFactory.getLogger(getClass());@Autowired ObjectMapper objectMapper;@Autowired MailService mailService;@JmsListener(destination = "jms/queue/mail", concurrency = "10")public void onMailMessageReceived(Message message) throws Exception {logger.info("received message: " + message);if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();MailMessage mm = objectMapper.readValue(text, MailMessage.class);mailService.sendRegistrationMail(mm);} else {logger.error("unable to process non-text message!");}} }參考:集成JMS - 廖雪峰的官方網站
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Java消息服务~@JmsListener集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式2—结构型模式
- 下一篇: 小甲鱼 OllyDbg 教程系列 (十四