生活随笔
收集整理的這篇文章主要介紹了
Springboot RabbitMQ
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
轉(zhuǎn)載:Springboot RabbitMQ
Springboot RabbitMQ 開發(fā),Idea 的文件目錄:
安裝過程我就不寫了,服務(wù)的安裝請參考前往:RabbitMQ Centos7 安裝以及使用
https://blog.csdn.net/yexiaomodemo/article/details/80473411
同樣,RabbitMQ里面的運(yùn)行機(jī)制等如:虛擬地址、交換機(jī)、路由鍵、隊列、Direct、Topic、Fanout 等幾種模式請自行學(xué)習(xí),這里只做Springboot RabbitMQ 的實現(xiàn),開始貼代碼。
我這服務(wù)器大家可以用,不過別攻擊哈,性能不是很好。
?
Pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> </parent> <groupId>com.xing.rabbitmq</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <description>springbootrabbitmq project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.yml
# server continer threads ,connectionsserver:port: 9081uri-encoding: UTF-8max-threads: 100max-connections: 5000 spring:application:name: spirng-boot-rabbitmqrabbitmq:host: 47.106.203.79port: 5672username: liuxingpassword: liuxingpublisher-confirms: true #publisher-returns: true virtual-host: /liuxingconnection-timeout: 1500devtools:restart:enabled: false
?
package com.xing.rabbitmq.config; import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class ExchangeConfig { public static final String DIRECT_EXCHANGE = "direct_exchange"; public static final String FANOUT_EXCHANGE = "fanout_exchange"; public static final String TOPIC_EXCHANGE = "topic_exchange"; @Bean public DirectExchange directExchange(){DirectExchange directExchange = new DirectExchange( ExchangeConfig.DIRECT_EXCHANGE,true,false); return directExchange;} @Bean public FanoutExchange fanoutExchange(){FanoutExchange fanoutExchange = new FanoutExchange( ExchangeConfig.FANOUT_EXCHANGE,true,false); return fanoutExchange;} @Bean public TopicExchange topicExchange(){TopicExchange topicExchange = new TopicExchange( ExchangeConfig.TOPIC_EXCHANGE,true,false); return topicExchange;} }
package com.xing.rabbitmq.config; import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class QueueConfig { public static final String QUEUE_DIRECT_NAME = "direct_queue"; public static final String QUEUE_FANOUT_NAME1 = "fanout_queue1"; public static final String QUEUE_FANOUT_NAME2 = "fanout_queue2"; public static final String COM_TOPIC_QUEUE_LIU = "com.topic.queue.liu"; public static final String COM_TOPIC_QUEUE_XING = "com.topic.queue.xing"; @Bean public Queue DirectQueue() { return new Queue( QUEUE_DIRECT_NAME ,true,false,false);} @Bean public Queue fanoutQueue1() { return new Queue(QUEUE_FANOUT_NAME1 ,true,false,false);} @Bean public Queue fanoutQueue2() { return new Queue(QUEUE_FANOUT_NAME2 ,true,false,false);} @Bean public Queue topicQueueLiu() { return new Queue( COM_TOPIC_QUEUE_LIU ,true,false,false);} @Bean public Queue topicQueueXing() { return new Queue( COM_TOPIC_QUEUE_XING ,true,false,false);} }
package com.xing.rabbitmq.config; import com.xing.rabbitmq.mqcallback.MsgSendConfirmCallBack;import com.xing.rabbitmq.mqcallback.MsgSendReturnCallback;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class RabbitMqConfig { @Autowired private ConnectionFactory connectionFactory; public static final String ROUTIN_DIRECT_KEY = "queue_direct_key"; public static final String ROUTIN_FANOUT_KEY = "queue_fanout_key"; public static final String ROUTIN_TOPIC_KEY = "com.topic.queue.*"; @Autowired private QueueConfig queueConfig; @Autowired private ExchangeConfig exchangeConfig; @Bean public Binding binding_direct() { return BindingBuilder.bind(queueConfig.DirectQueue()).to(exchangeConfig.directExchange()).with( RabbitMqConfig.ROUTIN_DIRECT_KEY );} @Bean public Binding binding_fanout1() { return BindingBuilder.bind(queueConfig.fanoutQueue1()).to(exchangeConfig.fanoutExchange()) ; } @Bean public Binding binding_fanout2() { return BindingBuilder.bind(queueConfig.fanoutQueue2()).to(exchangeConfig.fanoutExchange()) ; } @Bean public Binding binding_topic_liu() { return BindingBuilder.bind(queueConfig.topicQueueLiu()).to(exchangeConfig.topicExchange()).with( RabbitMqConfig.ROUTIN_TOPIC_KEY );} @Bean public Binding binding_topic_xing() { return BindingBuilder.bind(queueConfig.topicQueueXing()).to(exchangeConfig.topicExchange()).with( RabbitMqConfig.ROUTIN_TOPIC_KEY );} @Bean public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setConfirmCallback(msgSendConfirmCallBack()); template.setReturnCallback(msgSendReturnCallback());template.setMandatory(true); return template;} @Bean public MsgSendConfirmCallBack msgSendConfirmCallBack(){ return new MsgSendConfirmCallBack();} @Bean public MsgSendReturnCallback msgSendReturnCallback(){ return new MsgSendReturnCallback();} }
package com.xing.rabbitmq.controller; import com.xing.rabbitmq.sender.direct.FirstDirectSender;import com.xing.rabbitmq.sender.fanout.FirstFanoutSender;import com.xing.rabbitmq.sender.topic.TopicSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController; import java.util.UUID;@RestControllerpublic class SendController { @Autowired private FirstDirectSender firstDirectSender; @Autowired private FirstFanoutSender firstFanoutSender; @Autowired private TopicSender topicSender ; @GetMapping("/directSend") public String directSend(String message){String uuid = UUID.randomUUID().toString();firstDirectSender.send(uuid,message); return uuid;} @GetMapping("/fanoutSend") public String fanoutSend(String message){String uuid = UUID.randomUUID().toString();firstFanoutSender.send( uuid,message ); return uuid;} @GetMapping("/topicSend") public String topicSend(String message){String uuid = UUID.randomUUID().toString();topicSender.send( uuid,message ); return uuid;} }
package com.xing.rabbitmq.mqcallback; import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate; public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("MsgSendConfirmCallBack , 回調(diào)id:" + correlationData); if (ack) {System.out.println("消息消費(fèi)成功");} else {System.out.println("消息消費(fèi)失敗:" + cause+"\n重新發(fā)送");}} }
package com.xing.rabbitmq.mqcallback; import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate; public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("回饋消息:"+message);} }
package com.xing.rabbitmq.receiver.direct; import com.xing.rabbitmq.config.QueueConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class DirectConsumer { public static final Logger logger = LoggerFactory.getLogger( DirectConsumer.class ) ; @RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { logger.info("直連模式隊列:{}, 消費(fèi)者: 接收到消息如下:{} " , QueueConfig.QUEUE_DIRECT_NAME , message);} }
package com.xing.rabbitmq.receiver.fanout; import com.xing.rabbitmq.config.QueueConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class FanoutConsumer1 { public static final Logger logger = LoggerFactory.getLogger( FanoutConsumer1.class ) ; @RabbitListener(queues = { QueueConfig.QUEUE_FANOUT_NAME1 }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { logger.info("廣播機(jī)制隊列:{}, 第一個消費(fèi)者: 接收到消息如下:{} " , QueueConfig.QUEUE_FANOUT_NAME1 , message);} }
package com.xing.rabbitmq.receiver.fanout; import com.xing.rabbitmq.config.QueueConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class FanoutConsumer2 { public static final Logger logger = LoggerFactory.getLogger( FanoutConsumer2.class ) ; @RabbitListener(queues = { QueueConfig.QUEUE_FANOUT_NAME2 }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { logger.info("廣播機(jī)制隊列:{}, 第二個消費(fèi)者: 接收到消息如下:{} " , QueueConfig.QUEUE_FANOUT_NAME2 , message);} }
package com.xing.rabbitmq.receiver.topic; import com.xing.rabbitmq.config.QueueConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class TopicLiuConsumer { public static final Logger logger = LoggerFactory.getLogger( TopicLiuConsumer.class ) ; @RabbitListener(queues = { QueueConfig.COM_TOPIC_QUEUE_LIU }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { logger.info("TOPIC隊列:{} 匹配模式消費(fèi)者: 接收到消息如下:{} " , QueueConfig.COM_TOPIC_QUEUE_LIU , message);} }
package com.xing.rabbitmq.receiver.topic; import com.xing.rabbitmq.config.QueueConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class TopicXingConsumer { public static final Logger logger = LoggerFactory.getLogger( TopicXingConsumer.class ) ; @RabbitListener(queues = { QueueConfig.COM_TOPIC_QUEUE_XING }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { logger.info("TOPIC隊列:{} 匹配模式消費(fèi)者: 接收到消息如下:{} " , QueueConfig.COM_TOPIC_QUEUE_XING , message);} }
package com.xing.rabbitmq.sender.direct; import com.xing.rabbitmq.config.ExchangeConfig;import com.xing.rabbitmq.config.QueueConfig;import com.xing.rabbitmq.config.RabbitMqConfig;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; import java.util.UUID; @Componentpublic class FirstDirectSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(String uuid,Object message) {CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend( ExchangeConfig.DIRECT_EXCHANGE , RabbitMqConfig.ROUTIN_DIRECT_KEY ,message, correlationId);} }
package com.xing.rabbitmq.sender.fanout; import com.xing.rabbitmq.config.ExchangeConfig;import com.xing.rabbitmq.config.RabbitMqConfig;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Componentpublic class FirstFanoutSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(String uuid,Object message) {CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend( ExchangeConfig.FANOUT_EXCHANGE , RabbitMqConfig.ROUTIN_FANOUT_KEY ,message, correlationId ); } }
package com.xing.rabbitmq.sender.topic; import com.xing.rabbitmq.config.ExchangeConfig;import com.xing.rabbitmq.config.RabbitMqConfig;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Componentpublic class TopicSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(String uuid,Object message) {CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend( ExchangeConfig.TOPIC_EXCHANGE , RabbitMqConfig.ROUTIN_TOPIC_KEY ,message, correlationId ); } }
package com.xing.rabbitmq; import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplicationpublic class SpringbootRabbitmqApplication { public static void main(String[] args) {SpringApplication.run(SpringbootRabbitmqApplication.class, args);} }
基礎(chǔ)運(yùn)行時序圖:
使用方式:
啟動:SpringbootRabbitmqApplication訪問:SendController.class里面的三個鏈接,有對應(yīng)的日志打印出來,具體的拓?fù)鋱D請查看文件:Springboot RabbitMQ運(yùn)行時序圖.vsd
部分代碼參考的網(wǎng)友,補(bǔ)全了另外的幾種方式的應(yīng)用,也補(bǔ)齊了注釋
總結(jié)
以上是生活随笔為你收集整理的Springboot RabbitMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。