RocketMq(三、重试机制)
rocketmq的重試策略一般分為兩種:一種producer發送給MQ的重試,一種MQ發送給consumer的重試。
一、生產者的重試
package com.wk.test.rocketmqTest;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) {
//定義生產者名稱
DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
//連接rocketMQ的namesrv地址(這里是集群)
producer.setNamesrvAddr("10.32.16.179:9876");
//發送失敗重試3次
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
//1.主題,一般在服務器設置好,不能從代碼中新建。2.標簽。3.發送內容。
Message message = new Message("TopicQuickStart","Tag1",("生產者重試").getBytes());
//發送設置超時時間10秒
SendResult sendResult = producer.send(message,10000);
System.out.println(sendResult);
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}finally {
producer.shutdown();
}
}
}
10秒內沒有發送成功,最大重試次數為3次
二、消費者的重試
消費者的重試又分為兩種情況:1.消費者接收到消息拋出exception異常。2.消費者沒有收到消息,MQ發送超時。
情況一
package com.wk.test.rocketmqTest;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
//定義消費者名稱,MQ往消費者推送
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
//連接rocketMQ的namesrv地址(此次為集群)
consumer.setNamesrvAddr("10.32.16.179:9876");
//新訂閱組第一次啟動,從頭消費到尾,后續從上次的消費進度繼續消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱的主題和標簽(*代表所有標簽)
consumer.subscribe("TopicQuickStart", "Tag1 || Tag2");
//消費者監聽
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
MessageExt msg = msgs.get(0);
try {
String topic = msg.getTopic();
String msgbody = new String(msg.getBody(), "UTF-8");
String tag = msg.getTags();
System.out.println("topic:" + topic + " msgbody:" + msgbody + " tag:" + tag);
//dosomething...業務處理
} catch (Exception e) {
e.printStackTrace();
//重試3次扔不成功則不繼續重試
if(msg.getReconsumeTimes() == 3){
//記錄日志或進行持久化操作。
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//MQ發送失敗重試機制,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//消息處理成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
接收到消息后,消費端進行處理,若拋出異常,重試三次不成功則對該消息進行記錄。
情況二
comsumer為集群的情況下,當MQ發送消息給c1時,若c1接收到消息后宕機了,則同一條消息會發送給c2進行處理。前提是c1和c2必須為同一個消費者組,即
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
要相同。
而這里會出現一個問題,就是如果c1之后又進行了重啟,則可能同一條消息發送給c2的同時,又被c1所處理,這就出現了同一條消息被多個消費者消費的情況,這種情況就需要我們保持業務的冪等性。
解決方案在網上搜索有:
1、消費端處理消息的業務邏輯保持冪等性
2、保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現
第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。
我們可以看到第1條的解決方式,很明顯應該在消費端實現,不屬于消息系統要實現的功能。第2條可以消息系統實現,也可以業務端實現。正常情況下出現重復消息的概率不一定大,且由消息系統實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的原因。
RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。
生產者與消費者重試機制要點
在測試生產者的重試機制的時候,發現了個問題,發現消費者端不管重試次數設置為多少,重試的時間都差不多。
網上搜索源代碼如下:
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
//1、獲取當前時間
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev ;
//2、去服務器看下有沒有主題消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
//3、通過這里可以很明顯看出 如果不是同步發送消息 那么消息重試只有1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//4、根據設置的重試次數,循環再去獲取服務器主題消息
for (times = 0; times < timesTotal; times++) {
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
//5、前后時間對比 如果前后時間差 大于 設置的等待時間 那么直接跳出for循環了 這就說明連接超時是不進行多次連接重試的
if (timeout < costTime) {
callTimeout = true;
break;
}
//6、如果超時直接報錯
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
}
}
由上可以看出:
1.異步發送重試只有1次。
2.只要連接時間超過設置的超時時間,則不會去重試。
3.producer重試是一個for循環重試的,是立即執行的,而consumer是有時間間隔執行的。
PS:producer默認重試2次,consumer默認重試16次。
總結
以上是生活随笔為你收集整理的RocketMq(三、重试机制)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 「已回复」请问毛泽东为什么又叫李德胜啊
- 下一篇: 偏执精神病(被害妄想症有多可怕)