松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

Kafka中的時間輪算法

博客首頁文章列表 松花皮蛋me 2019-03-24 12:16

一、定義

kafka中的層級時間輪是一個存儲定時任務的環形隊列,底層使用數組實現,數據中每個元素是一個定時任務雙向鏈表TimerTaskList,鏈表中的值才是真正的定時任務TimerTaskEntry。

二、如何表示無限的時間

如果直接使用數組來存儲定時任務,當精度要求是秒級時,數組長度無限大,內存肯定無法支撐的。其實timeWheel就是一個不存在hash沖突的數據結構,類似我們現實生活中的手表,任何時間中可以落入0-59的槽中,假設有秒級、分鐘級、小時級、一天級別、一月級別、一年級別……的精度“手表”,那么很長很長的時間就很方便用多個”手表”直觀表示了,也不需要占用太多的內存

public class TimeWheel {

/** 時間槽的刻度 */
private long tickMs;

/** 一圈有多少刻度 */
private int wheelSize;

/** 一圈能表示多長時間 */
private long interval;

/** 槽 */
private Bucket[] buckets;

/** 時間輪指針 */
private long currentTimestamp;

/** 上層時間輪 */
private volatile TimeWheel overflowWheel;

public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
    this.currentTimestamp = currentTimestamp;
    this.tickMs = tickMs;
    this.wheelSize = wheelSize;
    this.interval = tickMs * wheelSize;
    this.buckets = new Bucket[wheelSize];
    this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);

    for (int i = 0; i < wheelSize; i++) {
        buckets[i] = new Bucket();
    }
}
}

三、如何推進任務

需要注意的是kafka中的定時器只持有第一層timeWheel的引用。通過DelayQueue少量空間換時間的方法,將每個TimerTaskList按過期時間(TaskEntry的最短過期時間,添加時記錄)來排序,最短的放在隊頭。kafka中有一個expireOperationReaper線程,根據過期時間獲取到對應的TimerTaskEntry進行操作或者降級到下一層timeWheel

四、應用

kafka中是使用timeWheel實現延遲操作,比如延遲生產、延遲拉取、延遲刪除等,本文不進行深入討論。此處用客戶端主動發送心跳來說明如何使用timeWheel來管理連接-心跳。假設我們的精度是1s,一圈能表示的時間是60s。當有新連接加入時,就將連接放到現在指針位置的上一位,當60秒后客戶端還沒有心跳過來則連接斷開。如果在60秒,此連接有心跳發送,則將這個連接位置重置為當前指針的上一位

黑龙江6+1开奖结果查询