rocktmq 消息延时清空_RocketMQ-延时消息
一、延時消息的使用
使用比較簡單,指定message的DelayTimeLevel即可。示例代碼如下:
Message msg = new Message("DelayTopicTest","TagA",("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
//設置延遲級別,注意這里的3不是代表延遲3s
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
目前rockatmq支持的延遲時間有:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
以上支持的延遲時間在msg.setDelayTimeLevel對應的級別依次是1,2,3。。。。
二、實現原理
延遲隊列的核心思路
所有的延遲消息由producer發出之后,都會存放到同一個topic(SCHEDULE_TOPIC_XXXX)下,不同的延遲級別會對應不同的隊列序號,當延遲時間到之后,由定時線程讀取轉換為普通的消息存的真實指定的topic下,此時對于consumer端此消息才可見,從而被consumer消費。
延遲消息存放的結構
consumequeue
├── SCHEDULE_TOPIC_XXXX
│ ├── 0
│ │ └── 00000000000000000000
│ ├── 1
│ │ └── 00000000000000000000
│ ├── 2
│ │ └── 00000000000000000000
│ ├── 3
│ │ └── 00000000000000000000
│ ├── 4
│ │ └── 00000000000000000000
.....
.....
├── DelayTopicTest
│ ├── 0
│ │ └── 00000000000000000000
│ ├── 1
│ │ └── 00000000000000000000
│ ├── 2
│ │ └── 00000000000000000000
│ └── 3
│ └── 00000000000000000000
其中不同的延遲級別放在不同的隊列序號下(queueId=delayLevel-1)。每一個延遲級別對應的延遲消息轉換為普通消息的位置標識存放在~/store/config/delayOffset.json文件內。
key為對應的延遲級別,value對應不同延遲級別轉換為普通消息的offset值。
{
"offsetTable":{1:1,2:1,3:11,4:1,5:1,6:1,7:1,8:1,9:1,10:1,11:1,12:1,13:1,14:1,15:0,16:0,17:0,18:0}
}
三、源碼分析
入口:ScheduleMessageService.start
broker啟動的時候會調用此方法。
通過jdk自帶的Timer類開啟一個timer定時器,在這個timer類添加了多個TimeTask。其中不同的延遲級別都對應DeliverDelayedMessageTimerTask的不同實例。
TimeTask分為兩類:DeliverDelayedMessageTimerTask(每秒執行1次)和 ScheduleMessageService.this.persist()(每10秒是執行一次)
每一個延遲級別對應一個offset,這個offset是干嘛的呢?(先拋結論:這個offset的值代表每個級別的延遲隊列已經轉換為普通消息的位置)
public void start() {
//1. 根據支持的各種延遲級別,添加不同延遲時間的TimeTask
for (Map.Entry entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
//每一個延遲級別對應已經讀取為普通消息的offset值
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), 1000L);
}
}
//2. 添加一個10s執行一次的TimeTask
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
兩類TimeTask的作用
DeliverDelayedMessageTimerTask
掃描延遲消息隊列(SCHEDULE_TOPIC_XXXX)的消息,將該延遲消息轉換為指定的topic的消息。
核心代碼:ScheduleMessageService.executeOnTimeup
讀取不同延遲級別對應的延遲消息
取得對應延遲級別讀取的開始位置offset
將延遲消息轉換為指定topic的普通消息并存放起來
修改下一次讀取的offset值(修改的只是緩存),并指定下一次轉換延遲消息的timetask
ScheduleMessageService.this.persist()
將延遲隊列掃描處理的進度offset持久化到delayOffset.json文件中
public void executeOnTimeup() {
//讀取隊列SCHEDULE_TOPIC_XXXX,其中不同的延遲級別對應不同的隊列id(queueId=delayLevel-1)
ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(“SCHEDULE_TOPIC_XXXX”,delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//循環讀取延遲消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = this.correctDeliverTimestamp(now, tagsCode) - now;
//只有當延遲消息發送的時間在當前時間之前才處理,否則此消息應該延遲后再處理
if (countdown <= 0) {
//根據offset值讀取SCHEDULE_TOPIC_XXXX隊列的消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
try {
//將讀取的消息轉換為真實topic的消息(也就是普通消息)
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//存放此消息
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
} catch (Exception e) {
}
}
} else {
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
//計算下一次讀取延遲隊列的offset,是定時任務下一次從該位置讀取延時消息
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
//將下一次讀取延遲隊列的offset存放到一個緩存map中
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
}
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);
}
public synchronized void persist() {
//讀取offsetTable緩存的延遲隊列的值
String jsonString = this.encode(true);
if (jsonString != null) {
//讀取delayOffset.json的文件地址
String fileName = this.configFilePath();
try {
//持久化到delayOffset.json文件中
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
四、總結
通過源碼分析我們其實明白了,延遲消息相比普通消息只不過是在broker多了一層消息topic的轉換,對于消息的發送和消費和普通消息沒有什么差異。
但這里有一點要注意
RocketMQ的延遲消息本身有一個很大的缺點,熟悉java自帶的Timer類的小伙伴應該知道一個timer對應只有一個線程,然后來處理不同的timeTask,而RockerMQ本身也確實只new了一個Timer,也就是說當同時發送的延遲消息過多的時候一個線程處理速度一定是有瓶頸的,因此在實際項目中使用延遲消息一定不要過多依賴,只能作為一個輔助手段。
總結
以上是生活随笔為你收集整理的rocktmq 消息延时清空_RocketMQ-延时消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: tomcat如何增大并发_Tomcat
- 下一篇: officeopenxml excelp