新聞中心
前言
ThreadPoolExecutor中是如何做到線程復用的?
我們知道,一個線程在創(chuàng)建的時候會指定一個線程任務,當執(zhí)行完這個線程任務之后,線程自動銷毀。但是線程池卻可以復用線程,一個線程執(zhí)行完線程任務后不銷毀,繼續(xù)執(zhí)行另外的線程任務。那么它是如何做到的?這得從addWorker()說起。

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名申請、網頁空間、營銷軟件、網站建設、高陽網站維護、網站推廣。
addWorker()
- 先看上半部分addWorker()。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 對邊界設定的檢查
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}?retry:可能有些同學沒用過,它只是一個標記,它的下一個標記就是for循環(huán),在for循環(huán)里面調用continue/break再緊接著retry標記時,就表示從這個地方開始執(zhí)行continue/break操作,但這不是我們關注的重點。
從上面的代碼,我們可以看出,ThreadPoolExecutor在創(chuàng)建線程時,會將線程封裝成「工作線程worker」,并放入「工作線程組」中,然后這個worker反復從阻塞隊列中拿任務去執(zhí)行。這個addWorker是excute方法中調用的。
- 我們接著看下半部分。
private boolean addWorker(Runnable firstTask, boolean core) {
// 上半部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// core是ture,需要創(chuàng)建的線程為核心線程,則先判斷當前線程是否大于核心線程
// 如果core是false,證明需要創(chuàng)建的是非核心線程,則先判斷當前線程數(shù)是否大于總線程數(shù)
// 如果不小于,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 獲取線程全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 判斷線程池狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將當前線程添加到線程組
workers.add(w);
int s = workers.size();
// 如果線程組中的線程數(shù)大于最大線程池數(shù) largestPoolSize賦值s
if (s > largestPoolSize)
largestPoolSize = s;
// 添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功后執(zhí)行線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 添加失敗后執(zhí)行 addWorkerFailed
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}再看 addWorkerFailed(),與上邊相反,相當于一個回滾操作,會移除失敗的工作線程。
private void addWorkerFailed(Worker w) {
// 同樣需要全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}Worker
我們接著看Worker對象。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//.....
// 省略下邊代碼
}
Worker類實現(xiàn)了Runnable接口,所以Worker也是一個線程任務。在構造方法中,創(chuàng)建了一個線程,回過頭想想addWorker()里為啥可以t.start()應該很清楚了吧, 并且在構造方法中調用了線程工廠創(chuàng)建了一個線程實例,我們上節(jié)講過線程工廠。其實這也不是關注的重點,重點是這個runWorker()。
final void runWorker(Worker w) {
// 獲取當前的線程實例
Thread wt = Thread.currentThread();
// 直接從第一個任務開始執(zhí)行
Runnable task = w.firstTask;
// 獲取完之后把worker的firstTask置為null 防止下次獲取到
w.firstTask = null;
// 線程啟動之后,通過unlock方法釋放鎖
w.unlock(); // allow interrupts
// 線程異常退出時 為 true
boolean completedAbruptly = true;
try {
// Worker執(zhí)行firstTask或從workQueue中獲取任務,直到任務為空
while (task != null || (task = getTask()) != null) {
// 獲取鎖以防止在任務執(zhí)行過程中發(fā)生中斷
w.lock();
// 判斷邊界值 如果線程池中斷 則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 相當于鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}?首先去執(zhí)行創(chuàng)建這個worker時就有的任務,當執(zhí)行完這個任務后,worker的生命周期并沒有結束,在while循環(huán)中,worker會不斷地調用getTask方法從「阻塞隊列」中獲取任務然后調用task.run()執(zhí)行任務,從而達到「復用線程」的目的。只要getTask方法不返回null,此線程就不會退出。
我們接著看getTask()?。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果運行線程數(shù)超過了最大線程數(shù),但是緩存隊列已經空了,這時遞減worker數(shù)量。
// 如果有設置允許線程超時或者線程數(shù)量超過了核心線程數(shù)量,并且線程在規(guī)定時間內均未poll到任務且隊列為空則遞減worker數(shù)量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed為true,則會調用workQueue的poll方法獲取任務.
// 超時時間是keepAliveTime。如果超過keepAliveTime時長,
// 如果timed為false, 則會調用workQueue的take方法阻塞在當前。
// 隊列中有任務加入時,線程被喚醒,take方法返回任務,并執(zhí)行。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}大家有沒有想過這里為啥要用take和poll,它們都是出隊的操作,這么做有什么好處?
take & poll
?我們說take()方法會將核心線程阻塞掛起,這樣一來它就不會占用太多的cpu資源,直到拿到Runnable 然后返回。
如果「allowCoreThreadTimeOut」設置為true,那么核心線程就會去調用poll方法,因為poll可能會返回null,所以這時候核心線程滿足超時條件也會被銷毀。
?非核心線程會workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超時還沒有拿到,下一次循環(huán)判斷「compareAndDecrementWorkerCount」就會返回null,Worker對象的run()方法循環(huán)體的判斷為null,任務結束,然后線程被系統(tǒng)回收 。
再回頭看一下runWorker()是不是設計的很巧妙。
結束語
本節(jié)內容不是很好理解,想繼續(xù)探討的同學可以繼續(xù)閱讀它的源碼,這部分內容了解一下就好,其實我們從源碼中可以看到大量的線程狀態(tài)檢查,代碼寫的很健壯,可以從中學習一下。
網站名稱:面試官:線程池是如何做到線程復用的?有了解過嗎?
鏈接地址:http://www.5511xx.com/article/djpoppd.html


咨詢
建站咨詢
