新聞中心
前言
多線程編程中,為每個任務(wù)分配一個線程是不現(xiàn)實的,線程創(chuàng)建的開銷和資源消耗都是很高的。線程池應(yīng)運而生,成為我們管理線程的利器。Java 通過Executor接口,提供了一種標準的方法將任務(wù)的提交過程和執(zhí)行過程解耦開來,并用Runnable表示任務(wù)。

為永修等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及永修網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站設(shè)計、做網(wǎng)站、永修網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!
下面,我們來分析一下 Java 線程池框架的實現(xiàn)ThreadPoolExecutor。
下面的分析基于JDK1.7
生命周期
ThreadPoolExecutor中,使用CAPACITY的高3位來表示運行狀態(tài),分別是:
- RUNNING:接收新任務(wù),并且處理任務(wù)隊列中的任務(wù)
- SHUTDOWN:不接收新任務(wù),但是處理任務(wù)隊列的任務(wù)
- STOP:不接收新任務(wù),不出來任務(wù)隊列,同時中斷所有進行中的任務(wù)
- TIDYING:所有任務(wù)已經(jīng)被終止,工作線程數(shù)量為 0,到達該狀態(tài)會執(zhí)行
terminated() - TERMINATED:
terminated()執(zhí)行完畢
狀態(tài)轉(zhuǎn)換圖
ThreadPoolExecutor中用原子類來表示狀態(tài)位
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
線程池模型
核心參數(shù)
corePoolSize:最小存活的工作線程數(shù)量(如果設(shè)置allowCoreThreadTimeOut,那么該值為 0)maximumPoolSize:最大的線程數(shù)量,受限于CAPACITYkeepAliveTime:對應(yīng)線程的存活時間,時間單位由TimeUnit指定workQueue:工作隊列,存儲待執(zhí)行的任務(wù)RejectExecutionHandler:拒絕策略,線程池滿后會觸發(fā)
線程池的最大容量:CAPACITY中的前三位用作標志位,也就是說工作線程的最大容量為(2^29)-1
四種模型
CachedThreadPool:一個可緩存的線程池,如果線程池的當前規(guī)模超過了處理需求時,那么將回收空閑的線程,當需求增加時,則可以添加新的線程,線程池的規(guī)模不存在任何的限制。FixedThreadPool:一個固定大小的線程池,提交一個任務(wù)時就創(chuàng)建一個線程,直到達到線程池的最大數(shù)量,這時線程池的大小將不再變化。SingleThreadPool:一個單線程的線程池,它只有一個工作線程來執(zhí)行任務(wù),可以確保按照任務(wù)在隊列中的順序來串行執(zhí)行,如果這個線程異常結(jié)束將創(chuàng)建一個新的線程來執(zhí)行任務(wù)。ScheduledThreadPool:一個固定大小的線程池,并且以延遲或者定時的方式來執(zhí)行任務(wù),類似于Timer。
執(zhí)行任務(wù) execute
核心邏輯:
- 當前線程數(shù)量 <
corePoolSize,直接開啟新的核心線程執(zhí)行任務(wù)addWorker(command, true) - 當前線程數(shù)量 >=
corePoolSize,且任務(wù)加入工作隊列成功- 檢查線程池當前狀態(tài)是否處于
RUNNING - 如果否,則拒絕該任務(wù)
- 如果是,判斷當前線程數(shù)量是否為 0,如果為 0,就增加一個工作線程。
- 檢查線程池當前狀態(tài)是否處于
- 開啟普通線程執(zhí)行任務(wù)
addWorker(command, false),開啟失敗就拒絕該任務(wù)
從上面的分析可以總結(jié)出線程池運行的四個階段:
poolSize < corePoolSize且隊列為空,此時會新建線程來處理提交的任務(wù)poolSize == corePoolSize,此時提交的任務(wù)進入工作隊列,工作線程從隊列中獲取任務(wù)執(zhí)行,此時隊列不為空且未滿。poolSize == corePoolSize,并且隊列已滿,此時也會新建線程來處理提交的任務(wù),但是poolSize < maxPoolSizepoolSize == maxPoolSize,并且隊列已滿,此時會觸發(fā)拒絕策略
拒絕策略
前面我們提到任務(wù)無法執(zhí)行會被拒絕,RejectedExecutionHandler是處理被拒絕任務(wù)的接口。下面是四種拒絕策略。
AbortPolicy:默認策略,終止任務(wù),拋出RejectedExceptionCallerRunsPolicy:在調(diào)用者線程執(zhí)行當前任務(wù),不拋異常DiscardPolicy: 拋棄策略,直接丟棄任務(wù),不拋異常DiscardOldersPolicy:拋棄最老的任務(wù),執(zhí)行當前任務(wù),不拋異常
線程池中的 Worker
Worker繼承了AbstractQueuedSynchronizer和Runnable,前者給Worker提供鎖的功能,后者執(zhí)行工作線程的主要方法runWorker(Worker w)(從任務(wù)隊列撈任務(wù)執(zhí)行)。Worker 引用存在workers集合里面,用mainLock守護。
private final ReentrantLock mainLock = new ReentrantLock(); private final HashSetworkers = new HashSet ();
核心函數(shù) runWorker
下面是簡化的邏輯,注意:每個工作線程的run都執(zhí)行下面的函數(shù)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
w.lock();
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
w.unlock();
}
processWorkerExit(w, completedAbruptly);
}- 從
getTask()中獲取任務(wù) - 鎖住 worker
- 執(zhí)行
beforeExecute(wt, task),這是ThreadPoolExecutor提供給子類的擴展方法 - 運行任務(wù),如果該worker有配置了首次任務(wù),則先執(zhí)行首次任務(wù)且只執(zhí)行一次。
- 執(zhí)行
afterExecute(task, thrown); - 解鎖 worker
- 如果獲取到的任務(wù)為 null,關(guān)閉 worker
獲取任務(wù) getTask
線程池內(nèi)部的任務(wù)隊列是一個阻塞隊列,具體實現(xiàn)在構(gòu)造時傳入。
private final BlockingQueueworkQueue;
getTask()從任務(wù)隊列中獲取任務(wù),支持阻塞和超時等待任務(wù),四種情況會導(dǎo)致返回null,讓worker關(guān)閉。
- 現(xiàn)有的線程數(shù)量超過最大線程數(shù)量
- 線程池處于
STOP狀態(tài) - 線程池處于
SHUTDOWN狀態(tài)且工作隊列為空 - 線程等待任務(wù)超時,且線程數(shù)量超過保留線程數(shù)量
核心邏輯:根據(jù)timed在阻塞隊列上超時等待或者阻塞等待任務(wù),等待任務(wù)超時會導(dǎo)致工作線程被關(guān)閉。
timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();在以下兩種情況下等待任務(wù)會超時:
- 允許核心線程等待超時,即
allowCoreThreadTimeOut(true) - 當前線程是普通線程,此時
wc > corePoolSize
工作隊列使用的是BlockingQueue,這里就不展開了,后面再寫一篇詳細的分析。
總結(jié)
ThreadPoolExecutor基于生產(chǎn)者-消費者模式,提交任務(wù)的操作相當于生產(chǎn)者,執(zhí)行任務(wù)的線程相當于消費者。Executors提供了四種基于ThreadPoolExecutor構(gòu)造線程池模型的方法,除此之外,我們還可以直接繼承ThreadPoolExecutor,重寫beforeExecute和afterExecute方法來定制線程池任務(wù)執(zhí)行過程。- 使用有界隊列還是無界隊列需要根據(jù)具體情況考慮,工作隊列的大小和線程的數(shù)量也是需要好好考慮的。
- 拒絕策略推薦使用
CallerRunsPolicy,該策略不會拋棄任務(wù),也不會拋出異常,而是將任務(wù)回退到調(diào)用者線程中執(zhí)行。
新聞名稱:Java線程池框架核心代碼分析
文章網(wǎng)址:http://www.5511xx.com/article/dpjcdch.html


咨詢
建站咨詢
