rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关
轉:https://blog.csdn.net/u014373554/article/details/92686063
項目是使用springboot項目開發的,前是代碼實現,后面有分析發送消息失敗、消息持久化、消費者失敗處理方法和發送消息解決方法及手動確認的模式
先引入pom.xml
<!--rabbitmq--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>application 配置文件
spring: rabbitmq:host: IP地址port: 5672username: 用戶名password: 密碼RabbitConfig配置文件 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope;/**Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。Queue:消息的載體,每個消息都會被投到一個或多個隊列。Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權限分離。Producer:消息生產者,就是投遞消息的程序.Consumer:消息消費者,就是接受消息的程序.Channel:消息通道,在客戶端的每個連接里,可建立多個channel. */ @Configuration @Slf4j public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;public static final String EXCHANGE_A = "my_mq_exchange_A";public static final String EXCHANGE_B = "my_mq_exchange_B";public static final String EXCHANGE_C = "my_mq_exchange_C";public static final String QUEUE_A="QUEUE_A";public static final String QUEUE_B="QUEUE_B";public static final String QUEUE_C="QUEUE_C";public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true); //設置發送消息失敗重試connectionFactory.setChannelCacheSize(100);//解決多線程發送消息return connectionFactory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());template.setMandatory(true); //設置發送消息失敗重試return template;}//配置使用json轉遞數據@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {return new Jackson2JsonMessageConverter();}/*public SimpleMessageListenerContainer messageListenerContainer(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());return container;}*//*** 針對消費者配置* 1. 設置交換機類型* 2. 將隊列綁定到交換機* FanoutExchange: 將消息分發到所有的綁定隊列,無 routingkey的概念* HeadersExchange: 通過添加屬性key - value匹配* DirectExchange: 按照routingkey分發到指定隊列* TopicExchange : 多關鍵字匹配* @return*/@Beanpublic DirectExchange defaultExchange(){return new DirectExchange(EXCHANGE_A,true,false);}@Beanpublic Queue queueA(){return new Queue(QUEUE_A,true);// 隊列持久化}@Beanpublic Queue queueB(){return new Queue(QUEUE_B,true);// 隊列持久化}/*** 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去。* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}@Beanpublic Binding bindingB(){return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}}生成者
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID;/*** 生產者*/ @Component @Slf4j public class ProducerMessage implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{private RabbitTemplate rabbitTemplate;@Autowiredpublic ProducerMessage(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容rabbitTemplate.setReturnCallback(this::returnedMessage);rabbitTemplate.setMandatory(true);}public void sendMsg (Object content){CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);}/*** 消息發送到隊列中,進行消息確認* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(" 消息確認的id: " + correlationData);if(ack){log.info("消息發送成功");//發送成功 刪除本地數據庫存的消息}else{log.info("消息發送失敗:id "+ correlationData +"消息發送失敗的原因"+ cause);// 根據本地消息的狀態為失敗,可以用定時任務去處理數據}}/*** 消息發送失敗返回監控* @param message* @param i* @param s* @param s1* @param s2*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info("returnedMessage [消息從交換機到隊列失敗] message:"+message);} }消費者
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException;/*** 消費者*/@Slf4j @Componentpublic class ComsumerMessage {@RabbitListener(queues = RabbitConfig.QUEUE_A)public void handleMessage(Message message,Channel channel) throws IOException{try {String json = new String(message.getBody());JSONObject jsonObject = JSONObject.fromObject(json);log.info("消息了【】handleMessage" + json);int i = 1/0;//業務處理。/*** 防止重復消費,可以根據傳過來的唯一ID先判斷緩存數據中是否有數據* 1、有數據則不消費,直接應答處理* 2、緩存沒有數據,則進行消費處理數據,處理完后手動應答* 3、如果消息 處理異常則,可以存入數據庫中,手動處理(可以增加短信和郵件提醒功能)*///手動應答channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){log.error("消費消息失敗了【】error:"+ message.getBody());log.error("OrderConsumer handleMessage {} , error:",message,e);// 處理消息失敗,將消息重新放回隊列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);}}}發送消息:調用生成的方法
import com.zz.blog.BlogApplicationTests; import com.zz.blog.mq.ProducerMessage; import net.sf.json.JSONObject; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.UUID; public class Message extends BlogApplicationTests {@Autowiredprivate ProducerMessage producerMessage;@Testpublic void sendMessage(){JSONObject jsonObject = new JSONObject();jsonObject.put("id", UUID.randomUUID().toString());jsonObject.put("name","TEST");jsonObject.put("desc","訂單已生成");//防止發送消息失敗,將發送消息存入本地。producerMessage.sendMsg(jsonObject.toString());} }rabbitTemplate的發送消息流程是這樣的:
1 發送數據并返回(不確認rabbitmq服務器已成功接收)
2 異步的接收從rabbitmq返回的ack確認信息
3 收到ack后調用confirmCallback函數
注意:在confirmCallback中是沒有原message的,所以無法在這個函數中調用重發,confirmCallback只有一個通知的作用
在這種情況下,如果在2,3步中任何時候切斷連接,我們都無法確認數據是否真的已經成功發送出去,從而造成數據丟失的問題。
最完美的解決方案只有1種:
使用rabbitmq的事務機制。
但是在這種情況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。
基于上面的分析,我們使用一種新的方式來做到數據的不丟失。
在rabbitTemplate異步確認的基礎上
1 在本地緩存已發送的message
2 通過confirmCallback或者被確認的ack,將被確認的message從本地刪除
3 定時掃描本地的message,如果大于一定時間未被確認,則重發
當然了,這種解決方式也有一定的問題:
想象這種場景,rabbitmq接收到了消息,在發送ack確認時,網絡斷了,造成客戶端沒有收到ack,重發消息。(相比于丟失消息,重發消息要好解決的多,我們可以在consumer端做到冪等)。
消息存入本地:在message 發消息的寫數據庫中。
消息應答成功,則刪除本地消息,失敗更改消息狀態,可以使用定時任務去處理。
消息持久化:
消費者:?
/*** 防止重復消費,可以根據傳過來的唯一ID先判斷緩存數據庫中是否有數據* 1、有數據則不消費,直接應答處理* 2、緩存沒有數據,則進行消費處理數據,處理完后手動應答* 3、如果消息 處理異常則,可以存入數據庫中,手動處理(可以增加短信和郵件提醒功能)*/轉載于:https://www.cnblogs.com/duende99/p/11597619.html
總結
以上是生活随笔為你收集整理的rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java学生管理系统界面设计
- 下一篇: 宝塔面板6.9.0一键破解脚本