當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot应用之消息队列rabbitmq
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot应用之消息队列rabbitmq
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
序
本文主要講如何在Spring Boot里頭使用rabbitmq進行消息收發。
準備rabbitmq集群
具體查看docker搭建rabbitmq集群這篇文章。
新建項目
配置項
#http://segmentfault.com/a/1190000004309900 spring.rabbitmq.host=192.168.99.100 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/logging.level.org.springframework.amqp=ERROR logging.level.com.patterncat=INFO#spring.rabbitmq.dynamic 是否創建AmqpAdmin bean. 默認為: true #spring.rabbitmq.listener.acknowledge-mode 指定Acknowledge的模式. #spring.rabbitmq.listener.auto-startup 是否在啟動時就啟動mq,默認: true #spring.rabbitmq.listener.concurrency 指定最小的消費者數量. #spring.rabbitmq.listener.max-concurrency 指定最大的消費者數量. #spring.rabbitmq.listener.prefetch 指定一個請求能處理多少個消息,如果有事務的話,必須大于等于transaction數量. #spring.rabbitmq.listener.transaction-size 指定一個事務處理的消息數量,最好是小于等于prefetch的數量. #spring.rabbitmq.requested-heartbeat 指定心跳超時,0為不指定. #spring.rabbitmq.ssl.enabled 是否開始SSL,默認: false #spring.rabbitmq.ssl.key-store 指定持有SSL certificate的key store的路徑 #spring.rabbitmq.ssl.key-store-password 指定訪問key store的密碼. #spring.rabbitmq.ssl.trust-store 指定持有SSL certificates的Trust store. #spring.rabbitmq.ssl.trust-store-password 指定訪問trust store的密碼.生產者配置
@Configuration public class ProducerConfig {@BeanRabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}@BeanQueue queueFoo(RabbitAdmin rabbitAdmin) {Queue queue = new Queue("queue.foo", true);rabbitAdmin.declareQueue(queue);return queue;}@BeanQueue queueBar(RabbitAdmin rabbitAdmin) {Queue queue = new Queue("queue.bar", true);rabbitAdmin.declareQueue(queue);return queue;}@BeanTopicExchange exchange(RabbitAdmin rabbitAdmin) {TopicExchange topicExchange = new TopicExchange("exchange");rabbitAdmin.declareExchange(topicExchange);return topicExchange;}@BeanBinding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange,RabbitAdmin rabbitAdmin) {Binding binding = BindingBuilder.bind(queueFoo).to(exchange).with("queue.foo");rabbitAdmin.declareBinding(binding);return binding;}@BeanBinding bindingExchangeBar(Queue queueBar, TopicExchange exchange,RabbitAdmin rabbitAdmin) {Binding binding = BindingBuilder.bind(queueBar).to(exchange).with("queue.bar");rabbitAdmin.declareBinding(binding);return binding;}/*** 生產者用* @return*/@Beanpublic RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();rabbitMessagingTemplate.setMessageConverter(jackson2Converter());rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);return rabbitMessagingTemplate;}@Beanpublic MappingJackson2MessageConverter jackson2Converter() {MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();return converter;} }消費者配置
@Configuration @EnableRabbit public class ConsumerConfig implements RabbitListenerConfigurer {@AutowiredReceiverService receiverService;@Beanpublic DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();factory.setMessageConverter(new MappingJackson2MessageConverter());return factory;}@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// factory.setPrefetchCount(5);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());}}消息收發
發送服務
@Component public class SenderService {@Autowiredprivate RabbitMessagingTemplate rabbitMessagingTemplate;public void sendFoo2Rabbitmq(final Foo foo) {this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.foo", foo);}public void sendBar2Rabbitmq(final Bar bar){this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.bar", bar);} }調用
@SpringBootApplication @ComponentScan(basePackages = "com.patterncat") public class RabbitmqdemoApplication implements CommandLineRunner {public static void main(String[] args) {SpringApplication.run(RabbitmqdemoApplication.class, args);}@AutowiredSenderService senderService;@Overridepublic void run(String... strings) throws Exception {Random random = new Random();while (true){senderService.sendBar2Rabbitmq(new Bar(random.nextInt()));senderService.sendFoo2Rabbitmq(new Foo(UUID.randomUUID().toString()));}} }接收
@Component public class ReceiverService {@RabbitListener(queues = "queue.foo")public void receiveFooQueue(Foo foo) {System.out.println("Received Foo<" + foo.getName() + ">");}@RabbitListener(queues = "queue.bar")public void receiveBarQueue(Bar bar) {System.out.println("Received Bar<" + bar.getAge() + ">");} }查看輸出
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v1.3.2.RELEASE)2016-02-02 02:26:07.099 INFO 2185 --- [ main] com.patterncat.RabbitmqdemoApplication : Starting RabbitmqdemoApplication on Jupiter.local with PID 2185 (/Users/caibosi/workspace/rabbitmqdemo/target/classes started by caibosi in /Users/caibosi/workspace/rabbitmqdemo) 2016-02-02 02:26:07.101 INFO 2185 --- [ main] com.patterncat.RabbitmqdemoApplication : No active profile set, falling back to default profiles: default 2016-02-02 02:26:07.166 INFO 2185 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@15b3e5b: startup date [Tue Feb 02 02:26:07 CST 2016]; root of context hierarchy 2016-02-02 02:26:08.004 INFO 2185 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$d94c0656] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2016-02-02 02:26:08.976 INFO 2185 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2016-02-02 02:26:08.981 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648 2016-02-02 02:26:08.981 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 2016-02-02 02:26:09.029 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 Received Foo<062226b0-cc27-433c-8ba9-201379f2d7c4> Received Bar<-1717186171> Received Foo<b2ac3cb9-2dfd-4bf1-8975-06aaca270096> Received Bar<1094787449> Received Foo<d63160f5-f919-4122-a995-185edcb6c231> Received Bar<-39441298> Received Foo<235369f1-0b59-4c4f-9a51-2277f3179798> Received Bar<-596340646> Received Foo<ef6e596c-b088-4b83-8b5c-e78e7ceaabcd> Received Bar<-915839285> Received Foo<17fb113a-8845-473a-9f46-f850526f6f4d> Received Bar<-75651721> Received Foo<d796bb56-478a-41e8-a2ed-b5944ae01642> Received Bar<-1210351662> Received Foo<23700d4a-26f9-4280-836d-9f6d63c0a3a0> Received Bar<-2096776841> Received Foo<d10c28d7-2c75-4b7d-b51b-992b98101018> Received Bar<-1986644405>查看rabbitmq界面
查看隊列
這里是啟動了兩個實例,兩個接收者,加上prefecth設置為5,所以待ack的保持在2*2*5=20
查看channel
查看連接
坑
MimeType的warning
2016-02-02?02:19:04.444?WARN?2168?--- [ main] o.s.amqp.support.SimpleAmqpHeaderMapper?: skipping?header'contentType'?since it is not of expected type [org.springframework.util.MimeType]找到的出處DefaultAmqpHeaderMapper should accept contentType headers containing org.springframework.util.MimeType
沒有解決,目前是把amqp的log調整到error.
declare事項
為了方便自動化處理,可以在代碼里頭去declare queue、topic、exchange,最后注意declare binding,否則就接受不到消息。
工程github
參考
-
Spring-Boot-RabbitMQ
-
rabbitmq-consumer-receiver
-
rg-si-rabbit
-
spring-configuration-rabbitmq-connectivity
-
spring boot實戰(第十二篇)整合RabbitMQ
總結
以上是生活随笔為你收集整理的SpringBoot应用之消息队列rabbitmq的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: springboot整合flowable
- 下一篇: Spring核心技术原理-(1)-通过W