原 荐 简单说说Kafka中的时间轮算法
圖片描述(最多50字)
如果你理解了上面的定義,那么就不必往下看了。但如果你第一次看到和我一樣懵比,并且有不少疑問(wèn),那么這篇博文將帶你進(jìn)一步了解時(shí)間輪,甚至理解時(shí)間輪算法。如果有興趣,可以去看看其他的定時(shí)器 你真的了解延時(shí)隊(duì)列嗎 。博主認(rèn)為,時(shí)間輪定時(shí)器最大的優(yōu)點(diǎn):是任務(wù)的添加與移除,都是O(1)級(jí)的復(fù)雜度;
不會(huì)占用大量的資源;
只需要有一個(gè)線程去推進(jìn)時(shí)間輪就可以工作了。
我們將對(duì)時(shí)間輪做層層推進(jìn)的解析:
private Task[很長(zhǎng)] tasks;
public List<Task> getTaskList(long timestamp) {
return task.get(timestamp)
}
// 假裝這里真的能一毫秒一個(gè)循環(huán)
public void run(){
while (true){
getTaskList(System.currentTimeMillis()).后臺(tái)執(zhí)行()
Thread.sleep(1);
}
}
假如這個(gè)數(shù)組長(zhǎng)度達(dá)到了億億級(jí),我們確實(shí)可以這么干。 那如果將精度縮減到秒級(jí)呢?我們也需要一個(gè)百億級(jí)長(zhǎng)度的數(shù)組。
/ 一個(gè)精度為秒級(jí)的延時(shí)任務(wù)管理類(lèi) /
private Map<Long, Task> taskMap;
public List<Task> getTaskList(long timestamp) {
return taskMap.get(timestamp - timestamp % 1000)
}
// 新增一個(gè)任務(wù)
public void addTask(long timestamp, Task task) {
List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
if (taskList == null){
taskList = new ArrayList();
}
taskList.add(task);
}
// 假裝這里真的能一秒一個(gè)循環(huán)
public void run(){
while (true){
getTaskList(System.currentTimeMillis()).后臺(tái)執(zhí)行()
Thread.sleep(1000);
}
}
其實(shí)時(shí)間輪就是一個(gè)不存在hash沖突的數(shù)據(jù)結(jié)構(gòu)
圖片描述(最多50字)
就拿秒表來(lái)說(shuō),它總是落在 0 - 59 秒,每走一圈,又會(huì)重新開(kāi)始。用偽代碼模擬一下我們這個(gè)秒表:private Bucket[60] buckets;// 表示60秒
public void addTask(long timestamp, Task task) {
Bucket bucket = buckets[timestamp / 1000 % 60];
bucket.add(task);
}
public Bucket getBucket(long timestamp) {
return buckets[timestamp / 1000 % 60];
}
// 假裝這里真的能一秒一個(gè)循環(huán)
public void run(){
while (true){
getBucket(System.currentTimeMillis()).后臺(tái)執(zhí)行()
Thread.sleep(1000);
}
}
這樣,我們的時(shí)間總能落在0 - 59任意一個(gè)bucket上,就如同我們的秒鐘總是落在0 - 59刻度上一樣,這便是 時(shí)間輪的環(huán)形隊(duì)列 。
public class TimeWheel {
/ 一個(gè)時(shí)間槽的時(shí)間 */
private long tickMs;
/* 時(shí)間輪大小 /
private int wheelSize;
/ 時(shí)間跨度 */
private long interval;
/ 槽 */
private Bucket[] buckets;
/* 時(shí)間輪指針 /
private long currentTimestamp;
/ 上層時(shí)間輪 /
private volatile TimeWheel overflowWheel;
public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
this.currentTimestamp = currentTimestamp;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs wheelSize;
this.buckets = new Bucket[wheelSize];
this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new Bucket();
}
}
}
將任務(wù)添加到時(shí)間輪中十分簡(jiǎn)單,對(duì)于每個(gè)時(shí)間輪來(lái)說(shuō),比如說(shuō)秒級(jí)時(shí)間輪,和分級(jí)時(shí)間輪,都有它自己的過(guò)期槽。也就是delayMs < tickMs的時(shí)候。
1)比如說(shuō)有一個(gè)任務(wù)要在 16:29:07 執(zhí)行,從秒級(jí)時(shí)間輪中來(lái)看,當(dāng)我們的當(dāng)前時(shí)間走到16:29:06的時(shí)候,則表示這個(gè)任務(wù)已經(jīng)過(guò)期了。因?yàn)樗膁elayMs = 1000ms,小于了我們的秒級(jí)時(shí)間輪的tickMs(1000ms)。
比如說(shuō)有一個(gè)任務(wù)要在 16:41:25 執(zhí)行,從分級(jí)時(shí)間輪中來(lái)看,當(dāng)我們的當(dāng)前時(shí)間走到 16:41的時(shí)候( 分級(jí)時(shí)間輪沒(méi)有秒針!它的最小精度是分鐘(一定要理解這一點(diǎn)) ),則表示這個(gè)任務(wù)已經(jīng)到期,因?yàn)樗膁elayMs = 25000ms,小于了我們的分級(jí)時(shí)間輪的tickMs(60000ms)。
二、時(shí)間未到期,且delayMs小于interval。
/**
- 添加任務(wù)到某個(gè)時(shí)間輪
*/
public boolean addTask(TimedTask timedTask) {
long expireTimestamp = timedTask.getExpireTimestamp();
long delayMs = expireTimestamp - currentTimestamp;
if (delayMs < tickMs) {// 到期了
return false;
} else {
// 扔進(jìn)當(dāng)前時(shí)間輪的某個(gè)槽中,只有時(shí)間【大于某個(gè)槽】,才會(huì)放進(jìn)去
if (delayMs < interval) {
int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);
Bucket bucket = buckets[bucketIndex];
bucket.addTask(timedTask);
} else {
// 當(dāng)maybeInThisBucket大于等于wheelSize時(shí),需要將它扔到上一層的時(shí)間輪
TimeWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timedTask);
}
}
return true;
}
/** -
獲取或創(chuàng)建一個(gè)上層時(shí)間輪
*/
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
}
}
}
return overflowWheel;
}
當(dāng)然我們的時(shí)間輪還需要一個(gè)指針的推進(jìn)機(jī)制,總不能讓時(shí)間永遠(yuǎn)停留在當(dāng)前吧?推進(jìn)的時(shí)候,同時(shí)類(lèi)似遞歸,去推進(jìn)一下上一層的時(shí)間輪。注意:要強(qiáng)調(diào)一點(diǎn)的是,我們這個(gè)時(shí)間輪更像是電子表,它不存在時(shí)間的中間狀態(tài),也就是精度這個(gè)概念一定要理解好。比如說(shuō),對(duì)于秒級(jí)時(shí)間輪來(lái)說(shuō),它的精度只能保證到1秒,小于1秒的,都會(huì)當(dāng)成是已到期
對(duì)于分級(jí)時(shí)間輪來(lái)說(shuō),它的精度只能保證到1分,小于1分的,都會(huì)當(dāng)成是已到期
/**
-
嘗試推進(jìn)一下指針
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTimestamp + tickMs) {
currentTimestamp = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
this.getOverflowWheel()
.advanceClock(timestamp);
}
}
}
三、對(duì)于高層時(shí)間輪來(lái)說(shuō),精度越來(lái)越不準(zhǔn),會(huì)不會(huì)有影響?上面說(shuō)到,分級(jí)時(shí)間輪,精度只有分鐘級(jí),總不能延遲1秒的定時(shí)任務(wù)和延遲59秒的定時(shí)任務(wù)同時(shí)執(zhí)行吧?
有這個(gè)疑問(wèn)的同學(xué)很好!實(shí)際上很好解決,只需再入時(shí)間輪即可。比如說(shuō),對(duì)于分鐘級(jí)時(shí)間輪來(lái)說(shuō),delayMs為1秒和delayMs為59秒的都已經(jīng)過(guò)期,我們將其取出,再扔進(jìn)底層的時(shí)間輪不就可以了?
1秒的會(huì)被扔到秒級(jí)時(shí)間輪的下一個(gè)執(zhí)行槽中,而59秒的會(huì)被扔到秒級(jí)時(shí)間輪的后59個(gè)時(shí)間槽中。
細(xì)心的同學(xué)會(huì)發(fā)現(xiàn),我們的添加任務(wù)方法,返回的是一個(gè)bool
public boolean addTask(TimedTask timedTask)
再倒回去好好看看,添加到最底層時(shí)間輪失敗的(我們只能直接操作最底層的時(shí)間輪,不能直接操作上層的時(shí)間輪),是不是會(huì)直接返回flase? 對(duì)于再入失敗的任務(wù),我們直接執(zhí)行即可。
/**
-
將任務(wù)添加到時(shí)間輪
*/
public void addOrSubmitTask(TimedTask timedTask) {
if (!timeWheel.addTask(timedTask)) {
taskExecutor.submit(timedTask.getTask());
}
}
四、如何知道一個(gè)任務(wù)已經(jīng)過(guò)期?記得我們將任務(wù)存儲(chǔ)在槽中嘛?比如說(shuō)秒級(jí)時(shí)間輪中,有60個(gè)槽,那么一共有60個(gè)槽。如果時(shí)間輪共有兩層,也僅僅只有120個(gè)槽。我們只需將槽扔進(jìn)一個(gè)delayedQueue之中即可。
我們輪詢地從delayedQueue取出已經(jīng)過(guò)期的槽即可。(前面的所有代碼,為了簡(jiǎn)單說(shuō)明,并沒(méi)有引入這個(gè)DelayQueue的概念,所以不用去上面翻了,并沒(méi)有。博主覺(jué)得... 已經(jīng)看到這里了,應(yīng)該很明白這個(gè)DelayQueue的意義了。 )
其實(shí)簡(jiǎn)單來(lái)說(shuō),實(shí)際上定時(shí)任務(wù)單單使用DelayQueue來(lái)實(shí)現(xiàn),也是可以的,但是一旦任務(wù)的數(shù)量多了起來(lái),達(dá)到了百萬(wàn)級(jí),千萬(wàn)級(jí),針對(duì)這個(gè)delayQueue的增刪,將非常的慢。
一、面向槽的delayQueue
而對(duì)于時(shí)間輪來(lái)說(shuō),它只需要往delayQueue里面扔各種槽即可,比如我們的定時(shí)任務(wù)長(zhǎng)短不一,最長(zhǎng)的跨度到了24年,這個(gè)delayQueue也僅僅只有300個(gè)元素。
二、處理過(guò)期的槽
而這個(gè)槽到期后,也就是被我們從delayQueue中poll出來(lái)后,我們只需要將槽中的所有任務(wù)循環(huán)一次,重新加到新的槽中(添加失敗則直接執(zhí)行)即可。
/**
- 推進(jìn)一下時(shí)間輪的指針,并且將delayQueue中的任務(wù)取出來(lái)再重新扔進(jìn)去
*/
public void advanceClock(long timeout) {
try {
Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (bucket != null) {
timeWheel.advanceClock(bucket.getExpire());
bucket.flush(this::addTask);
}
} catch (Exception e) {
e.printStackTrace();
}
}
轉(zhuǎn)載于:https://blog.51cto.com/14028890/2309569
與50位技術(shù)專(zhuān)家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的原 荐 简单说说Kafka中的时间轮算法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 在 sql server 中,查询 数据
- 下一篇: MYSQL limit,offset 区