RabbitMQ(4) TopicExchange
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ(4) TopicExchange
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
topic 是RabbitMQ中最靈活的一種方式,可以根據routing_key自由的綁定不同的隊列
生產者工程
package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class TopicRabbitConfig {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message";/*** 武器庫*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueTopicMessage() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE);}@Beanpublic Queue queueTopicMessages() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE_S);}@Beanpublic Queue queueUserMessage() {return new Queue(TopicRabbitConfig.USER_MESSAGE);}@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}@BeanBinding bindingExchangeMessage(Queue queueTopicMessage, TopicExchange exchange) {//所有匹配routingKey=topic.message的消息,將放入Queue[name="topic.message"]return BindingBuilder.bind(queueTopicMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueTopicMessages, TopicExchange exchange) {//所有匹配routingKey=topic.# 的消息,將放入Queue[name="topic.messages"]return BindingBuilder.bind(queueTopicMessages).to(exchange).with("topic.#");}@BeanBinding bindingExchangeUserMessage(Queue queueUserMessage, TopicExchange exchange) {///所有匹配routingKey=user.# 的消息,將放入Queue[name="user.messages"]return BindingBuilder.bind(queueUserMessage).to(exchange).with("user.#");}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");} }?
發送消息
package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send1() {User user = new User();user.setUserName("Sender1.....");user.setMobile("1111111111");rabbitTemplate.convertAndSend("topicExchange","topic.message",user);}public void send2() {User user = new User();user.setUserName("Sender2.....");user.setMobile("2222222");rabbitTemplate.convertAndSend("topicExchange","topic.messages",user);}public void send3() {User user = new User();user.setUserName("Sender3.....");user.setMobile("33333");rabbitTemplate.convertAndSend("topicExchange","user.message",user);} }消費者工程
package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class TopicRabbitConstant {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message"; } package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User; import com.example.demo.utils.Base64Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = TopicRabbitConstant.TOPIC_MESSAGE) public class TopicReceiver1 {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@RabbitHandlerpublic void process(User user) {System.out.println("Receiver1 : " + user);}public void rev1(){//手動去獲取消息logger.info("獲取Queue[topic.message]消息>>>");Message mesg = rabbitTemplate.receive("topic.message");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {User u = (User) Base64Utils.byteToObj(body);//獲取字符串數據 System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}} }測試:
啟動消費者工程,生產者,執行如下方法
@Testpublic void send1() throws Exception {//會匹配到topic.#和topic.message 兩個Receiver都可以收到消息for (int i = 0, size = 10; i < size; i++) {topicSender.send1();}}也可以不用監聽的方式,手動自主獲取隊列消息,如消費工程:
例如生產者工程TopicRabbitConfig.java添加武器隊列:
/*** 武器庫*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");}生產武器:
public void send4() {//生產一批武器List<String> list = new ArrayList<String>();list.add("手槍");list.add("步槍");list.add("機槍");rabbitTemplate.convertAndSend("topicExchange","arm.gun",list);} @Testpublic void send4() throws Exception {topicSender.send4();}消費者:
package com.example.demo.rabbitMq;import com.example.demo.dto.User; import com.example.demo.rabbitMq.exchange.topic.TopicReceiver1; import com.example.demo.utils.Base64Utils; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource; import java.io.IOException; import java.util.List;@SpringBootTest @RunWith(SpringRunner.class) public class RabbitMqRevTest {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void topicRev1(){rev1();}public void rev1(){//手動去獲取消息logger.info("獲取Queue[arm.gun]消息>>>");Message mesg = rabbitTemplate.receive("arm.queue");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {List u = (List) Base64Utils.byteToObj(body);//獲取字符串數據 System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}} }測試:
?
?樣例代碼:
?https://github.com/xiaozhuanfeng?tab=repositories
?
轉載于:https://www.cnblogs.com/xiaozhuanfeng/p/10716236.html
總結
以上是生活随笔為你收集整理的RabbitMQ(4) TopicExchange的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 函数防抖和函数节流的最简单解释
- 下一篇: 双RR潮牌多少钱,在哪里能买到