RabbitMQ延迟消费和重复消费
轉載自?https://blog.csdn.net/quliuwuyiz/article/details/79301054
使用RabbitMQ實現延遲任務
場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設置成超時。
場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統自動取消訂單。
延遲任務的模型如下圖:
?
基于 RabbitMQ 實現的分布式延遲重試隊列
場景一:在消費該消息的時候,發現條件不滿足,需要等待30分鐘,重新消費該消息,再次判斷是否滿足條件,如果滿足則消費該消息,如果不滿足,則再等待30分鐘。這樣的場景通過mq隊列來實現。
在消息隊列的監聽過程中,先判斷條件是否滿足,滿足,則直接消費。不滿足,則將該消息發送到上圖的死信隊列,但是在死信隊列失效之后,需要重新轉發到當前隊列進行消費就可以實現該功能。
基本概念如下: 消息的TTL ( Time to Live ) 和 DLX (Dead Letter Exchange)
?
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。
可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數,所以要寫個int類型的字符串:
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統計到隊列的消息數中去。單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange。
Dead Letter Exchanges
Exchage的概念在這里就不在贅述,可以從這里進行了解。一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
1. 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
2. 上面的消息的TTL到了,消息過期了。
3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
package com.test.sender.delay;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import com.drools.model.MQPushErrorFlow;
@Component
@PropertySource(value = "classpath:riskConfigMq.properties")
public class LifsInCompleteDataOneSend {
private static final Log log = LogFactory.getLog(LifsInCompleteDataOneConfig.class);
private static final String DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST = "delay_queue_per_queue_lifs_ttl"; // TTL配置在隊列上的緩沖隊列。
private static final String DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST = "delay_queue_per_queue_lifs_routing_key"; // TTL配置在隊列上的緩沖隊列。
private static final Integer QUEUE_EXPIRATION_FIRST = 30000;
/**
* 消息隊列業務名稱
*/
@Value("${lifs.consumer.pushServiceName}")
private String pushServiceName;
/**
* 訂閱平臺名稱
*/
@Value("${lifs.consumer.platformName}")
private String platformName;
/**
* 消息隊列一個業務使用的隊列的數量
*/
@Value("${lifs.consumer.queueShardingCount}")
private Integer queueShardingCount;
/**
* 交換機的名稱,共用lifs監聽的交換機
*/
@Value("${lifs.consumer.exchangeName}")
private String exchangeName;
/**
* 重發次數
*/
@Value("${rabbitmq.resend.times}")
private Integer resendTimes;
/**
* 底層需要使用的真實發送對象,每個發送對象都需要對應一個
*/
@Resource(name = "lnCompleteDataOneRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Bean
public Queue delayQueueFirstTTL() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", exchangeName);
arguments.put("x-dead-letter-routing-key", getDirectRoutingKey(pushServiceName, 0, platformName));
arguments.put("x-message-ttl", QUEUE_EXPIRATION_FIRST);
Queue queue = new Queue(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, true, false, false, arguments);
log.info("第一次延遲隊列名稱: " + DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST + " ?延期之后的轉發的routingKey: " + getDirectRoutingKey(pushServiceName, 0, platformName) + " ?exchange: " + exchangeName);
/*
* Queue queue = QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL) //
* delay_queue_per_queue_ttl .withArgument("x-dead-letter-exchange",DELAY_EXCHANGE_NAME)
* .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME)
* .withArgument("x-message-ttl",QUEUE_EXPIRATION).build();?
* Queue queue =new Queue(DELAY_QUEUE_PER_QUEUE_TTL,true);
*/
return queue;
}
@Bean
public Binding lnCompleteDataOneBinding() {
return BindingBuilder.bind(delayQueueFirstTTL()).to(lnCompleteDataOneExchange()).with(DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST);
}
@Bean(name = "lnCompleteDataOneExchange")
public DirectExchange lnCompleteDataOneExchange() {
return new DirectExchange(exchangeName);
}
????private String getDirectRoutingKey(String pushServiceName, int shardingIndex, String platformName) {
????????return String.format("%s.%d.%s", pushServiceName, shardingIndex, platformName);
????}
????
????@Bean(name = "delayQueueFirstListenerContainer")
????public String delayQueueFirstListenerContainer(@Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {
????Queue queue = delayQueueFirstTTL();
????RabbitAdmin ra = new RabbitAdmin(connectionFactory);
????????ra.declareExchange(lnCompleteDataOneExchange());
????????ra.declareQueue(queue);
????????ra.declareBinding(lnCompleteDataOneBinding());
????????log.info("delayQueueFirstListenerContainer: queueName" + queue.getName() + " ?exchangeName: " + lnCompleteDataOneExchange().getName() + " routingKey: " + DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST);
????????return "";
????}
/**
* 自動生成uuid調用發送方法
*?
* @param dto
* @param routingId
*/
public String send(String message) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("延遲半分鐘的隊列中接受消息的時間: " + df.format(new Date()) + "\n消息的內容:" + message);
rabbitTemplate.convertAndSend(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, message); // 向隊列里面發送消息,第一個參數是隊列名稱,第二個參數是內容
return "sender delay";
}
}
package com.test.sender.delay;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import com.framework.mq.common.RabbitConfig;
@Component
@PropertySource(value = "classpath:riskConfigMq.properties")
public class LifsInCompleteDataOneConfig {
/**
* MQ服務地址和端口號
*/
@Value("${rabbitmq.addresses}")
private String addresses;
/**
* MQ用戶名
*/
@Value("${rabbitmq.username}")
private String username;
/**
* MQ密碼
*/
@Value("${rabbitmq.password}")
private String password;
/**
* MQ的虛擬主機
*/
@Value("${rabbitmq.virtualHost}")
private String virtualHost;
/**
* MQ是否使用發送確認模式(必須開啟)
*/
@Value("${rabbitmq.publisherConfirms}")
private boolean publisherConfirms;
/**
* 緩存的channel的數量
*/
@Value("${rabbitmq.channelCacheSize}")
private Integer channelCacheSize;
/**
* 緩存的連接數量
*/
@Value("${rabbitmq.connectionCacheSize}")
private Integer connectionCacheSize;
/**
* 交換機的名稱,共用lifs監聽的交換機
*/
@Value("${lifs.consumer.exchangeName}")
private String exchangeName;
/**
* 注入RabbitConfig對象
* @return
*/
@Bean(name = "lnCompleteDataOneRabbitConfig")
public RabbitConfig rabbitConfig() {
return new RabbitConfig(addresses, username, password, virtualHost, publisherConfirms, channelCacheSize,
connectionCacheSize, exchangeName);
}
/**
* 注入連接工廠對象
*?
* @param rabbitConfig 之前注入的 @RabbitConfig 對象
* @return
*/
@Bean(name = "lnCompleteDataOneConnectionFactory")
public ConnectionFactory connectionFactory(
@Qualifier(value = "lnCompleteDataOneRabbitConfig") RabbitConfig rabbitConfig) {
return rabbitConfig.getConnectionFactory();
}
/**
* 注入的 @RabbitTemplate 對象
*?
* @param connectionFactory
* @return
*/
@Bean(name = "lnCompleteDataOneRabbitTemplate")
RabbitTemplate rabbitTemplate(
@Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
在初次監聽消息隊列的地方
在業務代碼中,判斷條件是否滿足,如果不滿足,賦值incompleteDataFlagResult=1,在第二次重試的時候,如果還不滿足,則賦值incompleteDataFlagResult=2,如果滿足,則賦值incompleteDataFlagResult=200,直接消費,并發送回調的mq。
if(incompleteDataFlagResult==1){ //推進到等待30秒過期的隊列
lifsInCompleteDataOneSend.send(JSONObject.toJSONString(request));
}else if(incompleteDataFlagResult==2){ ?//推進到等待60秒過期的隊列
lifsInCompleteDataTwoSend.send(JSONObject.toJSONString(request));
}else if(incompleteDataFlagResult==3){ // 進行保存,需要手工處理
InstallmentRequestFailure installmentRequestFailure = new InstallmentRequestFailure();
installmentRequestFailureService.save(installmentRequestFailure);
}else if(incompleteDataFlagResult==200){
lifsPushSender.send(request, customerId);
}
?
---------------------?
作者:quliuwuyiz?
來源:CSDN?
原文:https://blog.csdn.net/quliuwuyiz/article/details/79301054?
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
總結
以上是生活随笔為你收集整理的RabbitMQ延迟消费和重复消费的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring注解式参数校验
- 下一篇: XSS跨站脚本攻击在Java开发中防范的