springboot 集成rabbitmq 实例
springboot 集成rabbitmq 實例
個人在學習rabbitmq時發現網上很少有系統性介紹springboot和rabbitmq如何集成的,其他人總結的都片段化,所以結合個人調研過程,整理此篇文章。
本文章共分為以下部分:
- rabbitmq簡介
- springboot配置
- rabbitmq生產者配置
- rabbitmq消費者配置
- 問題補充
一、rabbitmq簡介
目前流程的消息隊列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的應用場景,關于各個框架的介紹,大家可自行百度,網上很多文章介紹~其中rabbit因為其ack特性以及還算不錯的性能被大多數公司采用。
概念:
- 生產者 消息的產生方,負責將消息推送到消息隊列
- 消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息
- 隊列 消息的寄存器,負責存放生產者發送的消息
- 交換機 負責根據一定規則分發生產者產生的消息
- 綁定 完成交換機和隊列之間的綁定
模式:
- direct
直連模式,用于實例間的任務分發 - topic
話題模式,通過可配置的規則分發給綁定在該exchange上的隊列 - headers
適用規則復雜的分發,用headers里的參數表達規則 - fanout
分發給所有綁定到該exchange上的隊列,忽略routing key
安裝
單機版安裝很簡單,大概步驟如下:
# 安裝erlang包yum install erlang # 安裝socatyum install socat # 安裝rabbit? ? rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm? # 啟動服務rabbitmq-server start # 增加管理控制功能rabbitmq-plugins?enable?rabbitmq_management # 增加用戶:sudo rabbitmqctl add_user root passwordrabbitmqctl?set_user_tags root?administrator?rabbitmqctl?set_permissions?-p?/ root '.*' '.*' '.*'集群安裝,可參考以下博客:
? ? ?
rabbitmq集群安裝
二、springboot配置
廢話少說直接上代碼:
配置參數
application.yml:
java config讀取參數
/*** RabbitMq配置文件讀取類** @author chenhf* @create 2017-10-23 上午9:31**/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig {@Value("${spring.rabbitmq.addresses}")private String addresses;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.publisher-confirms}")private Boolean publisherConfirms;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;// 構建mq實例工廠@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){return new RabbitAdmin(connectionFactory);}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;} }三、rabbitmq生產者配置
主要配置了直連和話題模式,其中話題模式設置兩個隊列(queueTopicTest1、queueTopicTest2),此兩個隊列在和交換機綁定時分別設置不同的routingkey(.TEST.以及lazy.#)來驗證匹配模式。
/*** 用于配置交換機和隊列對應關系* 新增消息隊列應該按照如下步驟* 1、增加queue bean,參見queueXXXX方法* 2、增加queue和exchange的binding* @author chenhf* @create 2017-10-23 上午10:33**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class RabbitMqExchangeConfig {/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);/*** @Author:chenhf* @Description: 主題型交換機* @Date:下午5:49 2017/10/23* @param* @return*/@BeanTopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());rabbitAdmin.declareExchange(contractTopicExchange);logger.debug("完成主題型交換機bean實例化");return contractTopicExchange;}/*** 直連型交換機*/@BeanDirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());rabbitAdmin.declareExchange(contractDirectExchange);logger.debug("完成直連型交換機bean實例化");return contractDirectExchange;}//在此可以定義隊列@BeanQueue queueTest(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());rabbitAdmin.declareQueue(queue);logger.debug("測試隊列實例化完成");return queue;}//topic 1@BeanQueue queueTopicTest1(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());rabbitAdmin.declareQueue(queue);logger.debug("話題測試隊列1實例化完成");return queue;}//topic 2@BeanQueue queueTopicTest2(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());rabbitAdmin.declareQueue(queue);logger.debug("話題測試隊列2實例化完成");return queue;}//在此處完成隊列和交換機綁定@BeanBinding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());rabbitAdmin.declareBinding(binding);logger.debug("測試隊列與直連型交換機綁定完成");return binding;}//topic binding1@BeanBinding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());rabbitAdmin.declareBinding(binding);logger.debug("測試隊列與話題交換機1綁定完成");return binding;}//topic binding2@BeanBinding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());rabbitAdmin.declareBinding(binding);logger.debug("測試隊列與話題交換機2綁定完成");return binding;}}在這里用到枚舉類:RabbitMqEnum
/*** 定義rabbitMq需要的常量** @author chenhf* @create 2017-10-23 下午4:07**/ public class RabbitMqEnum {/*** @param* @Author:chenhf* @Description:定義數據交換方式* @Date:下午4:08 2017/10/23* @return*/public enum Exchange {CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發"),CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),CONTRACT_DIRECT("CONTRACT_DIRECT", "點對點");private String code;private String name;Exchange(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定義隊列名稱* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueName {TESTQUEUE("TESTQUEUE", "測試隊列"),TOPICTEST1("TOPICTEST1", "topic測試隊列"),TOPICTEST2("TOPICTEST2", "topic測試隊列");private String code;private String name;QueueName(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定義routing_key* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueEnum {TESTQUEUE("TESTQUEUE1", "測試隊列key"),TESTTOPICQUEUE1("*.TEST.*", "topic測試隊列key"),TESTTOPICQUEUE2("lazy.#", "topic測試隊列key");private String code;private String name;QueueEnum(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}}以上完成消息生產者的定義,下面封裝調用接口
測試時直接調用此工具類,testUser類需自己實現
四、rabbitmq消費者配置
springboot注解方式監聽隊列,無法手動指定回調,所以采用了實現ChannelAwareMessageListener接口,重寫onMessage來進行手動回調,詳見以下代碼,詳細介紹可以在spring的官網上找amqp相關章節閱讀
直連消費者
通過設置TestUser的name來測試回調,分別發兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費
topic消費者1
/*** 消費者配置** @author chenhf* @create 2017-10-30 下午3:14**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration {@Bean("topicTest1Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST1");container.setMessageListener(exampleListener1());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest1Listener")public ChannelAwareMessageListener exampleListener1(){return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST1:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}topic消費者2
/*** 消費者配置** @author chenhf* @create 2017-10-30 下午3:14**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration2 {@Bean("topicTest2Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST2");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest2Listener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST2:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}問題補充
使用過程中可能出現的坑參考此篇文章
https://segmentfault.com/a/11...
總結
以上是生活随笔為你收集整理的springboot 集成rabbitmq 实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: openssl加密http网站过程1
- 下一篇: Android FragmentMana