新聞中心
背景
這是張小帥失業(yè)之后的第三場面試。

成都創(chuàng)新互聯(lián)是一家專業(yè)提供觀山湖企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、成都網(wǎng)站建設(shè)、H5場景定制、小程序制作等業(yè)務。10年已為觀山湖眾多企業(yè)、政府機構(gòu)等服務。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡公司優(yōu)惠進行中。
面試官:“實際開發(fā)中用過多線程吧,那聊聊線程池吧”。
“有CachedThreadPool:可緩存線程池,FixedThreadPool:定長線程池.......balabala”。小帥暗暗竊喜,還好把這幾種線程池背下來了,看來這次可以上岸了。
面試官點點頭,繼續(xù)問到“那線程池底層是如何實現(xiàn)復用的?”
“額,這個....”
寒風中,那個男人的背影在暮色中顯得孤寂而凄涼,仿佛與世隔絕,獨自面對著無盡的寂寞......
概要
如果問到線程池的話,不好好剖析過底層代碼,恐怕真的會像小帥那樣被問翻吧。
那么在此我們就來好好剖析一下線程池的底層吧。我們大概從如下幾個方面著手:
概覽圖
什么是線程池
說到線程池,其實我們要先聊到池化技術(shù)。
池化技術(shù):我們將資源或者任務放入池子,使用時從池中取,用完之后交給池子管理。通過優(yōu)化資源分配的效率,達到性能的調(diào)優(yōu)。
池化技術(shù)優(yōu)點:
- 資源被重復使用,減少了資源在分配銷毀過程中的系統(tǒng)的調(diào)度消耗。比如,在IO密集型的服務器上,并發(fā)處理過程中的子線程或子進程的創(chuàng)建和銷毀過程,帶來的系統(tǒng)開銷將是難以接受的。所以在業(yè)務實現(xiàn)上,通常把一些資源預先分配好,如線程池,數(shù)據(jù)庫連接池,Redis連接池,HTTP連接池等,來減少系統(tǒng)消耗,提升系統(tǒng)性能。
- 池化技術(shù)分配資源,會集中分配,這樣有效避免了碎片化的問題。
- 可以對資源的整體使用做限制,相關(guān)資源預分配且只在預分配后生成,后續(xù)不再動態(tài)添加,從而限制了整個系統(tǒng)對資源的使用上限。
所以我們說線程池是提升線程可重復利用率、可控性的池化技術(shù)的一種。
線程池的使用
1.多線程發(fā)送郵件案例
現(xiàn)在我們有這樣一個場景,上層有業(yè)務系統(tǒng)批量調(diào)用底層進行發(fā)送郵件,廢話不多,直接上代碼:
demo
最終運行輸出結(jié)果為:
由線程:pool-1-thread-1 發(fā)送第:0封郵件
由線程:pool-1-thread-2 發(fā)送第:1封郵件
由線程:pool-1-thread-1 發(fā)送第:2封郵件
由線程:pool-1-thread-2 發(fā)送第:3封郵件
由線程:pool-1-thread-1 發(fā)送第:4封郵件
由線程:pool-1-thread-1 發(fā)送第:6封郵件
由線程:pool-1-thread-2 發(fā)送第:5封郵件
由線程:pool-1-thread-1 發(fā)送第:7封郵件
由線程:pool-1-thread-2 發(fā)送第:8封郵件
由線程:pool-1-thread-1 發(fā)送第:9封郵件
上面的例子中從結(jié)果來看是10封郵件分別由兩條線程發(fā)送出去了,上圖可見,我們給ThreadPoolExecutor這個執(zhí)行器分別指定了七個參數(shù)。那么參數(shù)的含義到底是什么呢?接下來咱們層層抽絲剝繭。
2.構(gòu)造函數(shù)說明
大家估計會有疑問,線程池的種類那么多,案例中為什么要用TheadPoolExecutor類呢,其他的種類是由TheadPoolExecutor通過不同的入?yún)⒍x出來的,所以我們直接拿ThreadPoolExecutor來看。
我們先來看一下ThreadPoolExecutor的繼承關(guān)系,有個宏觀印象:
宏觀繼承
我們再來看一下ThreadPoolExecutor的構(gòu)造方法:
構(gòu)造方法
下面我們來解釋一下幾個參數(shù)的含義:
- corePoolSize:核心線程數(shù)。
- maximumPoolSize:最大線程數(shù)。
- keepAliveTime:線程池中線程的最大閑置生命周期。
- unit:針對keepAliveTime的時間單位。
- workQueue:阻塞隊列。
- threadFactory:創(chuàng)建線程的線程工廠。
- handler:拒絕策略。
大家對上述的含義初步有個概念。
3.工作流程概述
看了上面的構(gòu)造函數(shù)字段大家估計也還是優(yōu)點懵的,尤其是從來沒有接觸過商品池的小伙伴。所以老貓又擼了一張商品池的大概的工作流程圖,方便大家把這些概念串起來。
大概流程
上圖中老貓標記了四條線,簡單介紹一下(當然上圖若有問題,也希望大家能夠指出來)。
- 當發(fā)起任務時候,會計算線程池中存在的線程數(shù)量與核心線程數(shù)量(corePoolSize)進行比較,如果小于,則在線程池中創(chuàng)建線程,否則,進行下一步判斷。
- 如果不滿足條件1,則會將任務添加到阻塞隊列中。等待線程池中的線程空閑下來后,獲取隊列中的任務進行執(zhí)行。
- 但是條件2中如果阻塞隊列滿了之后,此時又會重新獲取當前線程的數(shù)量和最大線程數(shù)(maximumPoolSize)進行比較,如果發(fā)現(xiàn)小于最大線程數(shù),那么繼續(xù)添加到線程池中即可。
- 如果都不滿足上述條件,那么此時會放到拒絕策略中。
4.execute核心流程剖析
接下來我們來看一下執(zhí)行theadPoolExecutor.execute()的時候到底發(fā)生了什么。先來看一下源碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
(1) ctl變量
進入執(zhí)行源碼之后我們首先看到的是ctl,只知道ctl中拿到了一個int數(shù)據(jù)至于這個數(shù)值有什么用,目前不知道,接著看涉及的相關(guān)代碼,老貓將相關(guān)的代碼解讀放到源碼中進行注釋。
//通過ctl獲取線程池的狀態(tài)以及包含的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // COUNT_BITS = 32-3 = 29
/**001左移29位
* 00100000 00000000 00000000 00000000
* 操作減1
* 00011111 11111111 11111111 11111111(表示初始化的時候線程情況,1表示均有空閑線程)
* 換成十進制:COUNT_MASK = 536870911
*/
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
/**
* 運行中狀態(tài)
* 1的原碼
* 00000000 00000000 00000000 00000001
* 取反+1
* 11111111 11111111 11111111 11111111
* 左移29位
* 11100000 00000000 00000000 00000000
**/
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //運行中狀態(tài) 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; //終止狀態(tài) 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; //停止 00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
//取高3位表示獲取運行狀態(tài)
private static int runStateOf(int c) { return c & ~COUNT_MASK; } //~COUNT_MASK表示取反碼:11100000 00000000 00000000 00000000
//取出低位29位的值,當前活躍的線程數(shù)
private static int workerCountOf(int c) { return c & COUNT_MASK; } //COUNT_MASK:00011111 11111111 11111111 11111111
//計算ctl的值,ctl=[3位]線程池狀態(tài) + [29位]線程池中線程數(shù)量。
private static int ctlOf(int rs, int wc) { return rs | wc; } //進行或運算
上面我們針對各個狀態(tài)以及那么多的二進制表示符有點懵,當然如果不會二進制運算的,大家可以先自己去了解一下二進制的運算邏輯。通過源碼中的英文,我們知道CTL的值其實分成兩部分組成,高三位是狀態(tài),其余均為當前線程數(shù)。如下的圖:
線程池狀態(tài)
上面的圖的描述解釋,其實也都是英文注釋版的翻譯,我們再來看一下有了這些狀態(tài),這些狀態(tài)是怎么流轉(zhuǎn)的,英文注釋是這樣的:
/*** RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
* /
上面的描述不太直觀,老貓將流程串了起來,得到了下面的狀態(tài)機流轉(zhuǎn)圖。如下圖:
狀態(tài)機流程
寫到這里,其實ctl已經(jīng)很清楚了,ctl說白了就是狀態(tài)位和活躍線程數(shù)的表示方式。通過ctl咱們可以知道當前是什么狀態(tài)以及活躍線程數(shù)量是多少 (設(shè)計很巧妙,如果此處還有問題,歡迎大家私聊老貓)。
(3) 線程池中的線程數(shù)小于核心線程數(shù)
讀完ctl之后,我們來看一下接下來的代碼。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; //添加新的線程
c = ctl.get(); //重新獲取當前的狀態(tài)以及線程數(shù)量
}
繼上述的workerCountOf,我們知道這個方法可以獲取當前活躍的線程數(shù)。如果當前線程數(shù)小于配置的核心線程數(shù),則會調(diào)用addWorker進行添加新的線程。如果添加失敗了,則重新獲取ctl的值。
(4) 任務添加到隊列的相關(guān)邏輯
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次check一下,當前線程池是否是運行狀態(tài),如果不是運行時狀態(tài),則把剛剛添加到workQueue中的command移除掉
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
上述我們知道當添加線程池失敗的時候,我們會重新獲取ctl的值。此時咱們的第一步就很清楚了:
- 通過isRunning方法來判斷線程池狀態(tài)是不是運行中狀態(tài),如果是,則將command任務放到阻塞隊列workQueue中。
- 再次check一下,當前線程池是否是運行狀態(tài),如果不是運行時狀態(tài),則把剛剛添加到workQueue中的command移除掉,并調(diào)用拒絕策略。否則,判斷如果當前活動的線程數(shù)如果為0,則表明只去創(chuàng)建線程,而此處,并不執(zhí)行任務(因為,任務已經(jīng)在上面的offer方法中被添加到了workQueue中了,等待線程池中的線程去消費隊列中的任務)
(5) 線程池中的線程數(shù)量小于最大線程數(shù)代碼邏輯以及拒絕策略的代碼邏輯
接下來,我們看一下最后的一個步驟
/**
* 進入第三步驟前提:
* 1.線程池不是運行狀態(tài),所以isRunning(c)為false
* 2.workCount >= corePoolSize的時候于此同時并且添加到queue失敗的時候執(zhí)行
*/
else if (!addWorker(command, false))
reject(command);
}
由于調(diào)用addWorker的第二個參數(shù)是false,則表示對比的是最大線程數(shù),那么如果往線程池中創(chuàng)建線程依然失敗,即addWorker返回false,那么則進入if語句中,直接調(diào)用reject方法調(diào)用拒絕策略了。
寫到這里大家估計會對這個第二個參數(shù)是false為什么比較的是最大線程數(shù)有疑問。其實這個是addWorker中的方法。我們可以大概看一下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
我們很明顯地看到當core為flase的時候咱們獲取的是maximumPoolSize,也就是最大線程數(shù)。
寫到這里,其實咱們的核心主流程大概就已經(jīng)結(jié)束了。這里其實老貓也只是寫了一個算是比較入門的開頭。當然我們還可以再深入去理addWorker的源碼。這個其實就交給大家去細看了,篇幅過長,相信大家也會失去閱讀的興趣了,感興趣的可以自己研究一下,如果說還是有問題的,可以找老貓一起探討,老貓的公眾號:"程序員老貓"。老貓覺得在上述的源碼中比較重要的其實就是ctl值的流轉(zhuǎn)順序以及計算方式,讀懂這個的話,后面一切的源碼只要順藤摸瓜即可理解。
5.Executors線程池模板
我們上述主要和大家分享了比較核心的theadPoolExecutor。除此之外,線程池Executors里面包含了很多其他的線程池模板。當然這也是小貓直接面試的時候說的那些,其實小貓也就僅僅只是背了線程池模板而已,并不知曉其工作原理。如下幾種:
- newCachedThreadPool 創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newFixedThreadPool 創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
- newScheduledThreadPool 創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行。
- newSingleThreadScheduleExecutor 創(chuàng)建一個單線程執(zhí)行程序,它可安排在給定延遲后運行命令或者定期地執(zhí)行。(注意,如果因為在關(guān)閉前的執(zhí)行期間出現(xiàn)失敗而終止了此單個線程,那么如果需要,一個新線程會代替它執(zhí)行后續(xù)的任務)??杀WC順序地執(zhí)行各個任務,并且在任意給定的時間不會有多個線程是活動的。與其他等效的 newScheduledThreadPool(1) 不同,可保證無需重新配置此方法所返回的執(zhí)行程序即可使用其他的線程。
- newSingleThreadExecutor 創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
6.多樣化的blockingQueue
- PriorityBlockingQueue 它是一個無界的并發(fā)隊列。無法向這個隊列中插入null值。所有插入到這個隊列中的元素必須實現(xiàn)Comparable接口。因此該隊列中元素的排序就取決于你自己的Comparable實現(xiàn)。
- SynchronousQueue 它是一個特殊的隊列,它的內(nèi)部同時只能夠容納單個元素。如果該隊列已有一個元素的話,那么試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個新線程將該元素從隊列中抽走。同樣的,如果隊列為空,試圖向隊列中抽取一個元素的線程將會被阻塞,直到另一個線程向隊列中插入了一條新的元素。因此,它其實不太像是一個隊列,而更像是一個匯合點。
- ArrayBlockingQueue 它是一個有界的阻塞隊列,其內(nèi)部實現(xiàn)是將對象放到一個數(shù)組里。一但初始化,大小就無法修改
- LinkedBlockingQueue 它內(nèi)部以一個鏈式結(jié)構(gòu)(鏈接節(jié)點)對其元素進行存儲。可以指定元素上限,否則,上限則為Integer.MAX_VALUE。
- DelayQueue 它對元素進行持有直到一個特定的延遲到期。注意:進入其中的元素必須實現(xiàn)Delayed接口。
上述針對這些羅列了一下,其實很多官網(wǎng)上也有相關(guān)的介紹,當然感興趣的小伙伴也可以再去刨一刨里面的源碼實現(xiàn)。
7.拒絕策略
- AbortPolicy 丟棄任務并拋出RejectedExecutionException異常。
- DiscardPolicy 丟棄任務,但是不拋出異常。
- DiscardOldestPolicy 丟棄隊列中最前面的任務,然后重新嘗試執(zhí)行任務。
- CallerRunsPolicy 由調(diào)用線程處理該任務。
總結(jié)
很多小伙伴在用一些線程池或者第三方中間件的時候可能只停留在如何使用上,一旦出了問題或者被人深入問到其實現(xiàn)原理的時候就比較頭大。所以在日常開發(fā)的過程中,我們不僅僅需要知道如何去用,其實更應該知道底層的原理是什么。這樣才能長立于不敗之地。
新聞標題:背會了常見的幾個線程池用法,結(jié)果被問翻
文章URL:http://www.5511xx.com/article/cdgeeso.html


咨詢
建站咨詢
