新聞中心
上一篇我們講了定時(shí)器的幾種實(shí)現(xiàn),分析了在大數(shù)據(jù)量高并發(fā)的場(chǎng)景下這幾種實(shí)現(xiàn)方式就有點(diǎn)力不從心了,從而引出時(shí)間輪這種數(shù)據(jù)結(jié)構(gòu)。在netty 和kafka 這兩種優(yōu)秀的中間件中,都有時(shí)間輪的實(shí)現(xiàn)。文章最后,我們模擬kafka 中scala 的代碼實(shí)現(xiàn)java版的時(shí)間輪。

公司主營(yíng)業(yè)務(wù):網(wǎng)站制作、成都網(wǎng)站制作、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶(hù)真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)建站是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶(hù)帶來(lái)驚喜。創(chuàng)新互聯(lián)建站推出三山免費(fèi)做網(wǎng)站回饋大家。
Netty 的時(shí)間輪實(shí)現(xiàn)
接口定義
Netty 的實(shí)現(xiàn)自定義了一個(gè)超時(shí)器的接口io.netty.util.Timer,其方法如下:
- public interface Timer
- {
- //新增一個(gè)延時(shí)任務(wù),入?yún)槎〞r(shí)任務(wù)TimerTask,和對(duì)應(yīng)的延遲時(shí)間
- Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
- //停止時(shí)間輪的運(yùn)行,并且返回所有未被觸發(fā)的延時(shí)任務(wù)
- Set < Timeout > stop();
- }
- public interface Timeout
- {
- Timer timer();
- TimerTask task();
- boolean isExpired();
- boolean isCancelled();
- boolean cancel();
- }
Timeout接口是對(duì)延遲任務(wù)的一個(gè)封裝,其接口方法說(shuō)明其實(shí)現(xiàn)內(nèi)部需要維持該延遲任務(wù)的狀態(tài)。后續(xù)我們分析其實(shí)現(xiàn)內(nèi)部代碼時(shí)可以更容易的看到。
Timer接口有唯一實(shí)現(xiàn)HashedWheelTimer。首先來(lái)看其構(gòu)造方法,如下:
- public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
- {
- //省略代碼,省略參數(shù)非空檢查內(nèi)容。
- wheel = createWheel(ticksPerWheel);
- mask = wheel.length - 1;
- //省略代碼,省略槽位時(shí)間范圍檢查,避免溢出以及小于 1 毫秒。
- workerThread = threadFactory.newThread(worker);
- //省略代碼,省略資源泄漏追蹤設(shè)置以及時(shí)間輪實(shí)例個(gè)數(shù)檢查
- }
mask 的設(shè)計(jì)和HashMap一樣,通過(guò)限制數(shù)組的大小為2的次方,利用位運(yùn)算來(lái)替代取模運(yùn)算,提高性能。
構(gòu)建循環(huán)數(shù)組
首先是方法createWheel,用于創(chuàng)建時(shí)間輪的核心數(shù)據(jù)結(jié)構(gòu),循環(huán)數(shù)組。來(lái)看下其方法內(nèi)容
- private static HashedWheelBucket[] createWheel(int ticksPerWheel)
- {
- //省略代碼,確認(rèn) ticksPerWheel 處于正確的區(qū)間
- //將 ticksPerWheel 規(guī)范化為 2 的次方冪大小。
- ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
- HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
- for(int i = 0; i < wheel.length; i++)
- {
- wheel[i] = new HashedWheelBucket();
- }
- return wheel;
- }
數(shù)組的長(zhǎng)度為 2 的次方冪方便進(jìn)行求商和取余計(jì)算。
HashedWheelBucket內(nèi)部存儲(chǔ)著由HashedWheelTimeout節(jié)點(diǎn)構(gòu)成的雙向鏈表,并且存儲(chǔ)著鏈表的頭節(jié)點(diǎn)和尾結(jié)點(diǎn),方便于任務(wù)的提取和插入。
新增延遲任務(wù)
方法HashedWheelTimer#newTimeout用于新增延遲任務(wù),下面來(lái)看下代碼:
- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
- {
- //省略代碼,用于參數(shù)檢查
- start();
- long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
- if(delay > 0 && deadline < 0)
- {
- deadline = Long.MAX_VALUE;
- }
- HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
- timeouts.add(timeout);
- return timeout;
- }
可以看到任務(wù)并沒(méi)有直接添加到時(shí)間輪中,而是先入了一個(gè) mpsc 隊(duì)列,我簡(jiǎn)單說(shuō)下 mpsc【多生產(chǎn)者單一消費(fèi)者隊(duì)列】 是 JCTools 中的并發(fā)隊(duì)列,用在多個(gè)生產(chǎn)者可同時(shí)訪問(wèn)隊(duì)列,但只有一個(gè)消費(fèi)者會(huì)訪問(wèn)隊(duì)列的情況。,采用這個(gè)模式主要出于提升并發(fā)性能考慮,因?yàn)檫@個(gè)隊(duì)列只有線(xiàn)程workerThread會(huì)進(jìn)行任務(wù)提取操作。
工作線(xiàn)程如何執(zhí)行
- public void run()
- {
- {//代碼塊①
- startTime = System.nanoTime();
- if(startTime == 0)
- {
- //使用startTime==0 作為線(xiàn)程進(jìn)入工作狀態(tài)模式標(biāo)識(shí),因此這里重新賦值為1
- startTime = 1;
- }
- //通知外部初始化工作線(xiàn)程的線(xiàn)程,工作線(xiàn)程已經(jīng)啟動(dòng)完畢
- startTimeInitialized.countDown();
- }
- {//代碼塊②
- do {
- final long deadline = waitForNextTick();
- if(deadline > 0)
- {
- int idx = (int)(tick & mask);
- processCancelledTasks();
- HashedWheelBucket bucket = wheel[idx];
- transferTimeoutsToBuckets();
- bucket.expireTimeouts(deadline);
- tick++;
- }
- } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
- }
- {//代碼塊③
- for(HashedWheelBucket bucket: wheel)
- {
- bucket.clearTimeouts(unprocessedTimeouts);
- }
- for(;;)
- {
- HashedWheelTimeout timeout = timeouts.poll();
- if(timeout == null)
- {
- break;
- }
- if(!timeout.isCancelled())
- {
- unprocessedTimeouts.add(timeout);
- }
- }
- processCancelledTasks();
- }
- }
看 waitForNextTick,是如何得到下一次執(zhí)行時(shí)間的。
- private long waitForNextTick()
- {
- long deadline = tickDuration * (tick + 1);//計(jì)算下一次需要檢查的時(shí)間
- for(;;)
- {
- final long currentTime = System.nanoTime() - startTime;
- long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
- if(sleepTimeMs <= 0)//說(shuō)明時(shí)間已經(jīng)到了
- {
- if(currentTime == Long.MIN_VALUE)
- {
- return -Long.MAX_VALUE;
- }
- else
- {
- return currentTime;
- }
- }
- //windows 下有bug sleep 必須是10 的倍數(shù)
- if(PlatformDependent.isWindows())
- {
- sleepTimeMs = sleepTimeMs / 10 * 10;
- }
- try
- {
- Thread.sleep(sleepTimeMs);// 等待時(shí)間到來(lái)
- }
- catch(InterruptedException ignored)
- {
- if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
- {
- return Long.MIN_VALUE;
- }
- }
- }
- }
簡(jiǎn)單的說(shuō)就是通過(guò) tickDuration 和此時(shí)已經(jīng)滴答的次數(shù)算出下一次需要檢查的時(shí)間,時(shí)候未到就sleep等著。
任務(wù)如何入槽的。
- private void transferTimeoutsToBuckets() {
- //最多處理100000 怕任務(wù)延遲
- for(int i = 0; i < 100000; ++i) {
- //從隊(duì)列里面拿出任務(wù)呢
- HashedWheelTimer.HashedWheelTimeout timeout = (HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
- if (timeout == null) {
- break;
- }
- if (timeout.state() != 1) {
- long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
- //計(jì)算排在第幾輪
- timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
- long ticks = Math.max(calculated, this.tick);
- //計(jì)算放在哪個(gè)槽中
- int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
- HashedWheelTimer.HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
- //入槽,就是鏈表入隊(duì)列
- bucket.addTimeout(timeout);
- }
- }
- }
如何執(zhí)行的
- public void expireTimeouts(long deadline) {
- HashedWheelTimer.HashedWheelTimeout next;
- //拿到槽的鏈表頭部
- for(HashedWheelTimer.HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
- boolean remove = false;
- if (timeout.remainingRounds <= 0L) {//如果到這輪l
- if (timeout.deadline > deadline) {
- throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
- }
- timeout.expire();//執(zhí)行
- remove = true;
- } else if (timeout.isCancelled()) {
- remove = true;
- } else {
- --timeout.remainingRounds;//輪數(shù)-1
- }
- next = timeout.next;//繼續(xù)下一任務(wù)
- if (remove) {
- this.remove(timeout);//移除完成的任務(wù)
- }
- }
- }
就是通過(guò)輪數(shù)和時(shí)間雙重判斷,執(zhí)行完了移除任務(wù)。
小結(jié)一下
總體上看 Netty 的實(shí)現(xiàn)就是上文說(shuō)的時(shí)間輪通過(guò)輪數(shù)的實(shí)現(xiàn),完全一致??梢钥闯鰰r(shí)間精度由 TickDuration 把控,并且工作線(xiàn)程的除了處理執(zhí)行到時(shí)的任務(wù)還做了其他操作,因此任務(wù)不一定會(huì)被精準(zhǔn)的執(zhí)行。
而且任務(wù)的執(zhí)行如果不是新起一個(gè)線(xiàn)程,或者將任務(wù)扔到線(xiàn)程池執(zhí)行,那么耗時(shí)的任務(wù)會(huì)阻塞下個(gè)任務(wù)的執(zhí)行。
并且會(huì)有很多無(wú)用的 tick 推進(jìn),例如 TickDuration 為1秒,此時(shí)就一個(gè)延遲350秒的任務(wù),那就是有349次無(wú)用的操作。出現(xiàn)空推。
但是從另一面來(lái)看,如果任務(wù)都執(zhí)行很快(當(dāng)然你也可以異步執(zhí)行),并且任務(wù)數(shù)很多,通過(guò)分批執(zhí)行,并且增刪任務(wù)的時(shí)間復(fù)雜度都是O(1)來(lái)說(shuō)。時(shí)間輪還是比通過(guò)優(yōu)先隊(duì)列實(shí)現(xiàn)的延時(shí)任務(wù)來(lái)的合適些。
Kafka 中的時(shí)間輪
上面我們說(shuō)到 Kafka 中的時(shí)間輪是多層次時(shí)間輪實(shí)現(xiàn),總的而言實(shí)現(xiàn)和上述說(shuō)的思路一致。不過(guò)細(xì)節(jié)有些不同,并且做了點(diǎn)優(yōu)化。
先看看添加任務(wù)的方法。在添加的時(shí)候就設(shè)置任務(wù)執(zhí)行的絕對(duì)時(shí)間。
Kafka 中的時(shí)間輪
上面我們說(shuō)到 Kafka 中的時(shí)間輪是多層次時(shí)間輪實(shí)現(xiàn),總的而言實(shí)現(xiàn)和上述說(shuō)的思路一致。不過(guò)細(xì)節(jié)有些不同,并且做了點(diǎn)優(yōu)化。
先看看添加任務(wù)的方法。在添加的時(shí)候就設(shè)置任務(wù)執(zhí)行的絕對(duì)時(shí)間。
- def add(timerTaskEntry: TimerTaskEntry): Boolean = {
- val expiration = timerTaskEntry.expirationMs
- if (timerTaskEntry.cancelled) {
- // Cancelled
- false
- } else if (expiration < currentTime + tickMs) {
- // 如果已經(jīng)到期 返回false
- // Already expired
- false
- } else if (expiration < currentTime + interval) {//如果在本層范圍內(nèi)
- // Put in its own bucket
- val virtualId = expiration / tickMs
- val bucket = buckets((virtualId % wheelSize.toLong).toInt)//計(jì)算槽位
- bucket.add(timerTaskEntry)//添加到槽內(nèi)雙向鏈表中
- // Set the bucket expiration time
- if (bucket.setExpiration(virtualId * tickMs)) {//更新槽時(shí)間
- // The bucket needs to be enqueued because it was an expired bucket
- // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
- // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
- // will pass in the same value and hence return false, thus the bucket with the same expiration will not
- // be enqueued multiple times.
- queue.offer(bucket)//將槽加入DelayQueue,由DelayQueue來(lái)推進(jìn)執(zhí)行
- }
- true
- } else {
- //如果超過(guò)本層能表示的延遲時(shí)間,則將任務(wù)添加到上層。這里看到上層是按需創(chuàng)建的。
- // Out of the interval. Put it into the parent timer
- if (overflowWheel == null) addOverflowWheel()
- overflowWheel.add(timerTaskEntry)
- }
- }
那么時(shí)間輪是如何推動(dòng)的呢?Netty 中是通過(guò)固定的時(shí)間間隔掃描,時(shí)候未到就等待來(lái)進(jìn)行時(shí)間輪的推動(dòng)。上面我們分析到這樣會(huì)有空推進(jìn)的情況。
而 Kafka 就利用了空間換時(shí)間的思想,通過(guò) DelayQueue,來(lái)保存每個(gè)槽,通過(guò)每個(gè)槽的過(guò)期時(shí)間排序。這樣擁有最早需要執(zhí)行任務(wù)的槽會(huì)有優(yōu)先獲取。如果時(shí)候未到,那么 delayQueue.poll 就會(huì)阻塞著,這樣就不會(huì)有空推進(jìn)的情況發(fā)送。
我們來(lái)看下推進(jìn)的方法。
- def advanceClock(timeoutMs: Long): Boolean = {
- //從延遲隊(duì)列中獲取槽
- var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
- if (bucket != null) {
- writeLock.lock()
- try {
- while (bucket != null) {
- // 更新每層時(shí)間輪的currentTime
- timingWheel.advanceClock(bucket.getExpiration())
- //因?yàn)楦铝薱urrentTime,進(jìn)行一波任務(wù)的重新插入,來(lái)實(shí)現(xiàn)任務(wù)時(shí)間輪的降級(jí)
- bucket.flush(reinsert)
- //獲取下一個(gè)槽
- bucket = delayQueue.poll()
- }
- } finally {
- writeLock.unlock()
- }
- true
- } else {
- false
- }
- }
- // Try to advance the clock
- def advanceClock(timeMs: Long): Unit = {
- if (timeMs >= currentTime + tickMs) {
- // 必須是tickMs 整數(shù)倍
- currentTime = timeMs - (timeMs % tickMs)
- //推動(dòng)上層時(shí)間輪也更新currentTime
- // Try to advance the clock of the overflow wheel if present
- if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
- }
- }
從上面的 add 方法我們知道每次對(duì)比都是根據(jù)expiration < currentTime + interval 來(lái)進(jìn)行對(duì)比的,而advanceClock 就是用來(lái)推進(jìn)更新 currentTime 的。
小結(jié)一下
Kafka 用了多層次時(shí)間輪來(lái)實(shí)現(xiàn),并且是按需創(chuàng)建時(shí)間輪,采用任務(wù)的絕對(duì)時(shí)間來(lái)判斷延期,并且對(duì)于每個(gè)槽(槽內(nèi)存放的也是任務(wù)的雙向鏈表)都會(huì)維護(hù)一個(gè)過(guò)期時(shí)間,利用 DelayQueue 來(lái)對(duì)每個(gè)槽的過(guò)期時(shí)間排序,來(lái)進(jìn)行時(shí)間的推進(jìn),防止空推進(jìn)的存在。
每次推進(jìn)都會(huì)更新 currentTime 為當(dāng)前時(shí)間戳,當(dāng)然做了點(diǎn)微調(diào)使得 currentTime 是 tickMs 的整數(shù)倍。并且每次推進(jìn)都會(huì)把能降級(jí)的任務(wù)重新插入降級(jí)。
可以看到這里的 DelayQueue 的元素是每個(gè)槽,而不是任務(wù),因此數(shù)量就少很多了,這應(yīng)該是權(quán)衡了對(duì)于槽操作的延時(shí)隊(duì)列的時(shí)間復(fù)雜度與空推進(jìn)的影響。
模擬kafka的時(shí)間輪實(shí)現(xiàn)java版
定時(shí)器
- public class Timer {
- /**
- * 底層時(shí)間輪
- */
- private TimeWheel timeWheel;
- /**
- * 一個(gè)Timer只有一個(gè)delayQueue
- */
- private DelayQueue
delayQueue = new DelayQueue<>(); - /**
- * 過(guò)期任務(wù)執(zhí)行線(xiàn)程
- */
- private ExecutorService workerThreadPool;
- /**
- * 輪詢(xún)delayQueue獲取過(guò)期任務(wù)線(xiàn)程
- */
- private ExecutorService bossThreadPool;
- /**
- * 構(gòu)造函數(shù)
- */
- public Timer() {
- timeWheel = new TimeWheel(1000, 2, System.currentTimeMillis(), delayQueue);
- workerThreadPool = Executors.newFixedThreadPool(100);
- bossThreadPool = Executors.newFixedThreadPool(1);
- //20ms獲取一次過(guò)期任務(wù)
- bossThreadPool.submit(() -> {
- while (true) {
- this.advanceClock(1000);
- }
- });
- }
- /**
- * 添加任務(wù)
- */
- public void addTask(TimerTask timerTask) {
- //添加失敗任務(wù)直接執(zhí)行
- if (!timeWheel.addTask(timerTask)) {
- workerThreadPool.submit(timerTask.getTask());
- }
- }
- /**
- * 獲取過(guò)期任務(wù)
- */
- private void advanceClock(long timeout) {
- try {
- TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
- if (timerTaskList != null) {
- //推進(jìn)時(shí)間
- timeWheel.advanceClock(timerTaskList.getExpiration());
- //執(zhí)行過(guò)期任務(wù)(包含降級(jí)操作)
- timerTaskList.flush(this::addTask);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
任務(wù)
- public class TimerTask {
- /**
- * 延遲時(shí)間
- */
- private long delayMs;
- /**
- * 任務(wù)
- */
- private MyThread task;
- /**
- * 時(shí)間槽
- */
- protected TimerTaskList timerTaskList;
- /**
- * 下一個(gè)節(jié)點(diǎn)
- */
- protected TimerTask next;
- /**
- * 上一個(gè)節(jié)點(diǎn)
- */
- protected TimerTask pre;
- /**
- * 描述
- */
- public String desc;
- public TimerTask(long delayMs, MyThread task) {
- this.delayMs = System.currentTimeMillis() + delayMs;
- this.task = task;
- this.timerTaskList = null;
- this.next = null;
- this.pre = null;
- }
- public MyThread getTask() {
- return task;
- }
- public long getDelayMs() {
- return delayMs;
- }
- @Override
- public String toString() {
- return desc;
- }
- }
時(shí)間槽
- public class TimerTaskList implements Delayed {
- /**
- * 過(guò)期時(shí)間
- */
- private AtomicLong expiration = new AtomicLong(-1L);
- /**
- * 根節(jié)點(diǎn)
- */
- private TimerTask root = new TimerTask(-1L, null);
- {
- root.pre = root;
- root.next = root;
- }
- /**
- * 設(shè)置過(guò)期時(shí)間
- */
- public boolean setExpiration(long expire) {
- return expiration.getAndSet(expire) != expire;
- }
- /**
- * 獲取過(guò)期時(shí)間
- */
- public long getExpiration() {
- return expiration.get();
- }
- /**
- * 新增任務(wù)
- */
- public void addTask(TimerTask timerTask) {
- synchronized (this) {
- if (timerTask.timerTaskList == null) {
- timerTask.timerTaskList = this;
- TimerTask tail = root.pre;
- timerTask.next = root;
- timerTask.pre = tail;
- tail.next = timerTask;
- root.pre = timerTask;
- }
- }
- }
- /**
- * 移除任務(wù)
- */
- public void removeTask(TimerTask timerTask) {
- synchronized (this) {
- if (timerTask.timerTaskList.equals(this)) {
- timerTask.next.pre = timerTask.pre;
- timerTask.pre.next = timerTask.next;
- timerTask.timerTaskList = null;
- timerTask.next = null;
- timerTask.pre = null;
- }
- }
- }
- /**
- * 重新分配
- */
- public synchronized void flush(Consumer
flush) { - TimerTask timerTask = root.next;
- while (!timerTask.equals(root)) {
- this.removeTask(timerTask);
- flush.accept(timerTask);
- timerTask = root.next;
- }
- expiration.set(-1L);
- }
- @Override
- public long getDelay(TimeUnit unit) {
- return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
- }
- @Override
- public int compareTo(Delayed o) {
- if (o instanceof TimerTaskList) {
- return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
- }
- return 0;
- }
- }
時(shí)間輪
- public class TimeWheel {
- /**
- * 一個(gè)時(shí)間槽的范圍
- */
- private long tickMs;
- /**
- * 時(shí)間輪大小
- */
- private int wheelSize;
- /**
- * 時(shí)間跨度
- */
- private long interval;
- /**
- * 時(shí)間槽
- */
- private TimerTaskList[] timerTaskLists;
- /**
- * 當(dāng)前時(shí)間
- */
- private long currentTime;
- /**
- * 上層時(shí)間輪
- */
- private volatile TimeWheel overflowWheel;
- /**
- * 一個(gè)Timer只有一個(gè)delayQueue
- */
- private DelayQueue
delayQueue; - public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue
delayQueue) { - this.currentTime = currentTime;
- this.tickMs = tickMs;
- this.wheelSize = wheelSize;
- this.interval = tickMs * wheelSize;
- this.timerTaskLists = new TimerTaskList[wheelSize];
- //currentTime為tickMs的整數(shù)倍 這里做取整操作
- this.currentTime = currentTime - (currentTime % tickMs);
- this.delayQueue = delayQueue;
- for (int i = 0; i < wheelSize; i++) {
- timerTaskLists[i] = new TimerTaskList();
- }
- }
- /**
- * 創(chuàng)建或者獲取上層時(shí)間輪
- */
- private TimeWheel getOverflowWheel() {
- if (overflowWheel == null) {
網(wǎng)站標(biāo)題:聊一聊時(shí)間輪的實(shí)現(xiàn)
標(biāo)題來(lái)源:http://www.5511xx.com/article/dhhpjdi.html


咨詢(xún)
建站咨詢(xún)
