日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
ConcurrentInJava

這一系列只是對(duì)JUC各個(gè)部分做了說(shuō)明和介紹,沒人深入原理!

創(chuàng)新互聯(lián)公司主要從事成都網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)嵩縣,10年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):13518219792

concurrent并發(fā)包,讓你易于編寫并發(fā)程序。并發(fā)下我們經(jīng)常需要使用的基礎(chǔ)設(shè)施和解決的問(wèn)題有ThreadPool、Lock、管道、集合點(diǎn)、線程之間等待和喚醒、線程間數(shù)據(jù)傳輸、共享資源訪問(wèn)控制、并發(fā)線程之間的相互等待,等待。

concurrent提供的工具能夠解決絕大部分的場(chǎng)景,還能提高程序吞吐量。

現(xiàn)代的服務(wù)器多采用多核CPU,從而不同線程之間有可能真正地在同時(shí)運(yùn)行而不是cpu時(shí)間切片。在處理大計(jì)算量的程序上要盡可能利用CPU多核特性,提高系統(tǒng)吞吐量。

并發(fā)編程主要面臨三個(gè)問(wèn)題:

1.如何讓多個(gè)線程同時(shí)為同一個(gè)任務(wù)工作(并發(fā)編程設(shè)計(jì))

2.多個(gè)線程之間對(duì)共享資源的爭(zhēng)用。

3.多個(gè)線程之間如何相互合作、傳遞數(shù)據(jù)。

1. concurrent包提供的集合

concurrent包直接提供了標(biāo)準(zhǔn)集合的一些實(shí)現(xiàn),在下面做簡(jiǎn)單介紹。在大部分情況下可以使用它們提供高并發(fā)環(huán)境下對(duì)集合訪問(wèn)的吞吐量。

1.1 ConcurrentHashMap

Map的一個(gè)并發(fā)實(shí)現(xiàn)。在多線程環(huán)境下,它具有很高的吞吐量和具備可靠的數(shù)據(jù)一致性。它支持并發(fā)讀和一定程度的并發(fā)修改(默認(rèn)16個(gè)并發(fā),可以通過(guò)構(gòu)造函數(shù)修改)。

HashMap的實(shí)現(xiàn)是非線程安全的,高并發(fā)下會(huì)get方法常會(huì)死鎖,有的時(shí)候會(huì)表現(xiàn)為CPU居高不下。

 
 
 
 
  1. public V get(Object key) {  
  2. if (key == null)  
  3. return getForNullKey();  
  4. int hash = hash(key.hashCode());  
  5. for (Entry e = table[indexFor(hash, table.length)];  
  6. e != null;  
  7. e = e.next) {  
  8. Object k;  
  9. if (e.hash == hash && ((k = e.key) == key || key.equals(k)))  
  10. return e.value;  
  11. }  
  12. return null;  
  13. }  
  14.  

在get操作里面for循環(huán)取對(duì)象的操作,由于高并發(fā)同時(shí)讀寫,for循環(huán)的結(jié)果變得不可預(yù)知,所以有可能一直循環(huán)。

所以高并發(fā)環(huán)境下盡量不要直接使用HashMap,對(duì)系統(tǒng)造成的影響很難排除。

和Collections.synchronizedMap(new HashMap(...))相比,外ConcurrentHashMap在高并發(fā)的環(huán)境下有著更優(yōu)秀的吞吐量。因?yàn)镃oncurrentHashMap可以支持寫并發(fā),基本原理是內(nèi)部分段,分段的數(shù)量決定著并發(fā)程度。通過(guò)concurrencyLevel參數(shù)可以設(shè)置。如果你能預(yù)期并發(fā)數(shù)量那么設(shè)置該參數(shù)可以獲取更優(yōu)吞吐量。

另外為ConcurrentHashMap還實(shí)現(xiàn)了:

V putIfAbsent(K key, V value);

boolean remove(Object key, Object value);

boolean replace(K key, V oldValue, V newValue);

V replace(K key, V value);

這四個(gè)一致性的操作方法。

1.2 BlockingQueue

BlockingQueue定義了一個(gè)接口,繼承了Queue接口。Queue是一種數(shù)據(jù)結(jié)構(gòu),意思是它的項(xiàng)以先入先出(FIFO)順序存儲(chǔ)。

BlockingQueue為我們提供了一些多線程阻塞語(yǔ)義的方法,新增和重定義了一些方法插入:

BlockingQueue是線程安全的,非常適合多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者線程之間傳遞數(shù)據(jù)。

形象地理解,BlockingQueue好比有很多格子的傳輸帶系統(tǒng),不過(guò)當(dāng)你(生產(chǎn)者)調(diào)用put方法的時(shí)候,如果有空閑的格子那么放入物體后立刻返回,如果沒有空閑格子那么一直處于等待狀態(tài)。add方法意味著如果沒有空閑格子系統(tǒng)就會(huì)報(bào)警,然后如果處理該報(bào)警則按照你的意愿。offer方法優(yōu)先于add方法,它通過(guò)返回true 或 flase來(lái)告訴你是否放入成功。offer超時(shí)方法,如果不空閑的情況下,嘗試等待一段時(shí)間。

BlockingQueue有很多實(shí)現(xiàn)ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue

補(bǔ)充Dueue是個(gè)雙向隊(duì)列,可以當(dāng)做堆棧來(lái)使用。

BlockingQueue在ThreadPool中,作為任務(wù)隊(duì)列來(lái)使用,用來(lái)保存沒有立刻執(zhí)行的工作任務(wù)對(duì)象。

1.3 SynchronousQueue

SychronousQueue是BlockingQueue的一個(gè)實(shí)現(xiàn),它看起來(lái)是一個(gè)隊(duì)列,但是其實(shí)沒有容量,是特定條件下的一個(gè)精簡(jiǎn)實(shí)現(xiàn)。

做個(gè)比喻,SychronousQueue對(duì)象就像一個(gè)接力棒,現(xiàn)在有兩個(gè)運(yùn)動(dòng)員交棒者和接棒者(線程)要做交接。在交接點(diǎn),交棒者沒有交出之前是不能松開的(一種等待狀態(tài)),接棒者在接到棒之前是必須等待。換一句話說(shuō)不管誰(shuí)先到交接點(diǎn),必須處于等待狀態(tài)。

在生產(chǎn)者和消費(fèi)者模型中。如果生產(chǎn)者向SychronousQueue進(jìn)行put操作,直到有另外的消費(fèi)者線程進(jìn)行take操作時(shí)才能返回。對(duì)消費(fèi)者也是一樣,take操作會(huì)被阻塞,直到生產(chǎn)者put。

在這種生產(chǎn)者-消費(fèi)者模型下,生產(chǎn)者和消費(fèi)者是進(jìn)行手對(duì)手傳遞產(chǎn)品,在消費(fèi)者消費(fèi)一個(gè)產(chǎn)品之前,生產(chǎn)者必須處于等待狀態(tài)。它給我們提供了在線程之間交換單一元素的極輕量級(jí)方法,并且具有阻塞語(yǔ)義。

提示:上面舉例中有寫局限性。其實(shí)生產(chǎn)者和消費(fèi)者進(jìn)程是可以任意數(shù)量的。M:N。生產(chǎn)線程之間會(huì)對(duì)SychronousQueue進(jìn)行爭(zhēng)用,消費(fèi)者也是一樣。

對(duì)SychronousQueue類似于其他語(yǔ)境中“會(huì)合通道”或 “連接”點(diǎn)問(wèn)題。它非常適合于傳遞性設(shè)計(jì),在這種設(shè)計(jì)中,在一個(gè)線程中運(yùn)行的對(duì)象要將某些信息、事件或任務(wù)傳遞給在另一個(gè)線程中運(yùn)行的對(duì)象,它就必須與該對(duì)象同步。

1.4Exchanger

是SychronousQueue的雙向?qū)崿F(xiàn)。用來(lái)伙伴線程間交互對(duì)象。Exchanger 可能在比如遺傳算法和管道設(shè)計(jì)中很有用。

形象地說(shuō),就是兩個(gè)人在預(yù)定的地方交互物品,任何一方?jīng)]到之前都處于等待狀態(tài)。

1.5 CopyOnWriteArrayList 和 CopyOnWriteArraySet

它們分別是List接口和Set接口的實(shí)現(xiàn)。正如類名所描述的那樣,當(dāng)數(shù)據(jù)結(jié)構(gòu)發(fā)生變化的時(shí)候,會(huì)復(fù)制自身的內(nèi)容,來(lái)保證一致性。大家都知道復(fù)制全部副本是非常昂貴的操作,看來(lái)這是一個(gè)非常不好的實(shí)現(xiàn)。事實(shí)上沒有最好和最差的方案,只有最合適的方案。一般情況下,處理多線程同步問(wèn)題,我們傾向使用同步的 ArrayList,但同步也有其成本。

那么在什么情況下使用CopyOnWriteArrayList 或者CopyOnWriteArraySet呢?

數(shù)據(jù)量小。

對(duì)數(shù)據(jù)結(jié)構(gòu)的修改是偶然發(fā)生的,相對(duì)于讀操作。

舉例來(lái)說(shuō),如果我們實(shí)現(xiàn)觀察者模式的話,作為監(jiān)聽器集合是非常合適的。

1.6 TimeUnit

雖然是個(gè)時(shí)間單位,但是它也是concurrent包里面的。也許你以前的代碼里面經(jīng)常出現(xiàn)1*60*1000來(lái)表示一分鐘,代碼可讀性很差?,F(xiàn)在你可以通過(guò)TimeUnit來(lái)編寫可讀性更好的代碼,concurrent的api里面涉及到時(shí)間的地方都會(huì)使用該對(duì)象。

我之所以先進(jìn)并發(fā)框架常用的集合,是因?yàn)榫€程池的實(shí)現(xiàn)特性都利用了BlockingQueue的一些特性。

#p#

2. ThreadPool

雖然線程和進(jìn)程相比是輕量級(jí)許多,但是線程的創(chuàng)建成本還是不可忽律,所以就有了線程池化的設(shè)計(jì)。線程池的創(chuàng)建、管理、回收、任務(wù)隊(duì)列管理、任務(wù)分配等細(xì)節(jié)問(wèn)題依然負(fù)責(zé),沒有必要重復(fù)發(fā)明輪子,concurrent包已經(jīng)為我們準(zhǔn)備了一些優(yōu)秀線程池的實(shí)現(xiàn)。

2.1 認(rèn)識(shí)ExecutorService 接口

ExecutorService 接口,它能提供的功能就是用來(lái)在將來(lái)某一個(gè)時(shí)刻異步地執(zhí)行一系列任務(wù)。雖然簡(jiǎn)單一句話,但是包含了很多需求點(diǎn)。它的實(shí)現(xiàn)至少包含了線程池和任務(wù)隊(duì)列兩個(gè)方面,其實(shí)還包括了任務(wù)失敗處理策略等。

經(jīng)常使用submit方法,用來(lái)提交任務(wù)對(duì)象。

簡(jiǎn)單的例子:

ExecutorService es = Executors.newCachedThreadPool();

es.submit(new Runnable(){

@Override

public void run() {

System.out.println("do some thing");

}

});

es.shutdown();

上面的例子只是完成了提交了一個(gè)任務(wù),異步地去執(zhí)行它。但是有些使用場(chǎng)景更為復(fù)雜,比如等待獲得異步任務(wù)的返回結(jié)果,或者最多等上固定的時(shí)間。

submit 方法返回一個(gè)對(duì)象,F(xiàn)uture??雌饋?lái)有點(diǎn)別扭,代表將來(lái)的對(duì)象。其實(shí)看一下Future的方法就明白了。

其實(shí)Future對(duì)象代表了一個(gè)異步任務(wù)的結(jié)果,可以用來(lái)取消任務(wù)、查詢?nèi)蝿?wù)狀態(tài),還有通過(guò)get方法獲得異步任務(wù)返回的結(jié)果。當(dāng)調(diào)用get方法的時(shí)候,當(dāng)前線程被阻塞直到任務(wù)被處理完成或者出現(xiàn)異常。

我們可以通過(guò)保存Future對(duì)象來(lái)跟蹤查詢異步任務(wù)的執(zhí)行情況。

顯然Runnable接口中定義的 public void run();方法并不能返回結(jié)果對(duì)象,所以concurrent包提供了Callable接口,它可以被用來(lái)返回結(jié)果對(duì)象。

2.2 ThreadPoolExecutor

ThreadPoolExecutor實(shí)現(xiàn)了ExecutorService 接口,也是我們最主要使用的實(shí)現(xiàn)類。

首先非常有必要看一些類的最完整的構(gòu)造函數(shù)

 
 
 
 
  1. ThreadPoolExecutor(int corePoolSize,  
  2.  
  3.    int maximumPoolSize,  
  4.  
  5. long keepAliveTime,  
  6.  
  7. TimeUnit unit,  
  8.  
  9. BlockingQueue  workQueue,  
  10.  
  11. ThreadFactory threadFactory,  
  12.  
  13. RejectedExecutionHandler handler)  
  14.  

ThreadPoolExecutor對(duì)象中有個(gè)poolSize變量表示當(dāng)前線程池中正在運(yùn)行的線程數(shù)量。

注意:這個(gè)有關(guān)非常重要的關(guān)系,常常被誤解。poolSize變量和corePoolSize、maximumPoolSize以及workQueue的關(guān)系。

首先線程池被創(chuàng)建初期,還沒有執(zhí)行任何任務(wù)的時(shí)候,poolSize等于0;

每次向線程池提交任務(wù)的時(shí)候,線程池處理過(guò)程如下:

1. 如果poolSize少于 corePoolSize,則首選添加新的線程,而不進(jìn)行排隊(duì)。

2. 如果poolSize等于或多于 corePoolSize,則首選將請(qǐng)求加入隊(duì)列workQueue,而不添加新的線程。

3. 如果第二步執(zhí)行失敗(隊(duì)已滿),則創(chuàng)建新的線程執(zhí)行任務(wù),但是如果果poolSize已經(jīng)達(dá)到maximumPoolSize,那么就拒絕該任務(wù)。如果處理被拒絕的任務(wù)就取決于RejectedExecutionHandler handler的設(shè)置了,默認(rèn)情況下會(huì)拋出異常。

系統(tǒng)存在四種任務(wù)拒絕策略:

在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionException。

在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。

在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。

在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過(guò)程)。

keepAliveTime活動(dòng)線程如果空閑一段時(shí)間是否可以回收,通常只作用于超出corePoolSize的線程。corePoolSize的線程創(chuàng)建了就不會(huì)被回收。但是到j(luò)ava 6 之后增加了public void allowCoreThreadTimeOut(boolean value)方法,允許core進(jìn)程也可以根據(jù)keepAliveTime來(lái)回收,默認(rèn)為false。

決定線程池特性的還有workQueue的實(shí)現(xiàn)類,有三種類SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue,分別對(duì)應(yīng)同步隊(duì)列、無(wú)界隊(duì)列、有界隊(duì)列。

(摘自JavaDoc)

類SynchronousQueue,直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會(huì)構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。直接提交通常要求無(wú)界 maximumPoolSizes 以避免拒絕新提交的任務(wù)(設(shè)置maximumPoolSizes 為Integer.MAX_VALUE)。當(dāng)命令以超過(guò)隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無(wú)界線程具有增長(zhǎng)的可能性。

LinkedBlockingQueue,無(wú)界隊(duì)列。使用無(wú)界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣,創(chuàng)建的線程就不會(huì)超過(guò) corePoolSize。(因此,maximumPoolSize 的值也就無(wú)效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無(wú)界隊(duì)列;例如,在 Web 頁(yè)服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過(guò)隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無(wú)界線程具有增長(zhǎng)的可能性。

ArrayBlockingQueue,有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過(guò)您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會(huì)降低吞吐量。

綜上:構(gòu)造參數(shù)的設(shè)置是互相制約和影響的。只有當(dāng)你重復(fù)了解其相互關(guān)系的時(shí)候、或有特殊需求的時(shí)候,才可以自己構(gòu)造ThreadPoolExecutor對(duì)象,否則可以使用Executores是個(gè)工廠類。

提示使用線程池是注意處理shutdown,確保你系統(tǒng)關(guān)閉的時(shí)候主動(dòng)關(guān)閉shutdown。

2.3 ScheduledExecutorService

擴(kuò)展了ExecutorService接口,提供時(shí)間排程的功能。

schedule方法被用來(lái)延遲指定時(shí)間來(lái)執(zhí)行某個(gè)指定任務(wù)。如果你需要周期性重復(fù)執(zhí)行定時(shí)任務(wù)可以使用scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執(zhí)行,后者以相對(duì)固定頻率執(zhí)行。

(感謝wenbois2000 提出原先的錯(cuò)誤,我在這里重新描述!對(duì)于原先的錯(cuò)誤,實(shí)在不好意思啊,再次感謝!)不管任務(wù)執(zhí)行耗時(shí)是否大于間隔時(shí)間,scheduleAtFixedRate和scheduleWithFixedDelay都不會(huì)導(dǎo)致同一個(gè)任務(wù)并發(fā)地被執(zhí)行。唯一不同的是scheduleWithFixedDelay是當(dāng)前一個(gè)任務(wù)結(jié)束的時(shí)刻,開始結(jié)算間隔時(shí)間,如0秒開始執(zhí)行第一次任務(wù),任務(wù)耗時(shí)5秒,任務(wù)間隔時(shí)間3秒,那么第二次任務(wù)執(zhí)行的時(shí)間是在第8秒開始。

ScheduledExecutorService的實(shí)現(xiàn)類,是ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor對(duì)象包含的線程數(shù)量是沒有可伸縮性的,只會(huì)有固定數(shù)量的線程。不過(guò)你可以通過(guò)其構(gòu)造函數(shù)來(lái)設(shè)定線程的優(yōu)先級(jí),來(lái)降低定時(shí)任務(wù)線程的系統(tǒng)占用。

特別提示:通過(guò)ScheduledExecutorService執(zhí)行的周期任務(wù),如果任務(wù)執(zhí)行過(guò)程中拋出了異常,那么過(guò)ScheduledExecutorService就會(huì)停止執(zhí)行任務(wù),且也不會(huì)再周期地執(zhí)行該任務(wù)了。所以你如果想保住任務(wù)都一直被周期執(zhí)行,那么catch一切可能的異常。

2.4 Executors

Executores是個(gè)工廠類,用來(lái)生成ThreadPoolExecutor對(duì)象,它提供了一些常用的線程池配置方案,滿足我們大部分場(chǎng)景。

1. newCachedThreadPool

 
 
 
 
  1. public static ExecutorService newCachedThreadPool() {  
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3. 60L, TimeUnit.SECONDS,  
  4. new SynchronousQueue());  
  5. }  
  6.  

分析下出這個(gè)線程池配置的工作模式,當(dāng)沒有空閑進(jìn)程時(shí)就新建線程執(zhí)行,當(dāng)有空閑線程時(shí)就使用空閑線程執(zhí)行。當(dāng)線程空閑大60秒時(shí),系統(tǒng)自動(dòng)回收線程。

該線程池非常適合執(zhí)行短小異步任務(wù)時(shí)吞吐量非常高,會(huì)重復(fù)利用CPU的能力。但是如果任務(wù)處理IO邊界任務(wù),那么會(huì)消耗大量線程切換,降低系統(tǒng)吞吐量。所以執(zhí)行短小的計(jì)算任務(wù)非常高效,且當(dāng)沒有任務(wù)時(shí)不會(huì)消耗系統(tǒng)資源。

注意:線程池中沒有變量表示線程是否空閑。那么程序是如何控制的呢?不得不贊嘆concurrent實(shí)現(xiàn)的非常精巧。當(dāng)創(chuàng)建出來(lái)的線程完成原來(lái)的任務(wù)后,會(huì)調(diào)用BlockingQueue的Poll方法,對(duì)于SynchronousQueue實(shí)現(xiàn)而言會(huì)阻塞調(diào)用線程,直到另外的線程offer調(diào)用。

然而ThreadPool在分配任務(wù)的時(shí)候總是先去嘗試調(diào)用offer方法,所以就會(huì)觸發(fā)空閑線程再次調(diào)用。

精妙的是ThreadPoolExecutor的處理邏輯一樣,但是用BlockingQueue實(shí)現(xiàn)變了就產(chǎn)生不同的行為。

2. newFixedThreadPool

 
 
 
 
  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.  
  3. return new ThreadPoolExecutor(nThreads, nThreads,  
  4.  
  5. 0L, TimeUnit.MILLISECONDS,  
  6.  
  7. new LinkedBlockingQueue());  
  8.  
  9. }  
  10.  

創(chuàng)建固定線程數(shù)量的線程池,采用無(wú)界隊(duì)列,當(dāng)有更多任務(wù)的時(shí)候?qū)⒈环湃牍ぷ麝?duì)列中排隊(duì)。如果線程池不經(jīng)常執(zhí)行任務(wù)時(shí),你可以調(diào)用allowCoreThreadTimeOut(boolean value)的方法讓系統(tǒng)自動(dòng)回收core的進(jìn)程,以節(jié)約系統(tǒng)資源。

3. newSingleThreadExecutor

 
 
 
 
  1. public static ExecutorService newSingleThreadExecutor() {  
  2.  
  3. return new FinalizableDelegatedExecutorService  
  4.  
  5. (new ThreadPoolExecutor(1, 1,  
  6.  
  7. 0L, TimeUnit.MILLISECONDS,  
  8.  
  9. new LinkedBlockingQueue()));  
  10.  
  11. }  
  12.  

只有一個(gè)工作線程的線程池。和newFixedThreadPool(1)相比,不同之處有兩點(diǎn):

1. 不可以重新配置newSingleThreadExecutor創(chuàng)建出來(lái)的線程池。

2. 當(dāng)創(chuàng)建出來(lái)的線程池對(duì)象被GC回收時(shí),會(huì)自動(dòng)調(diào)用shutdown方法。

4.newScheduledThreadPool

 
 
 
 
  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.        return new ScheduledThreadPoolExecutor(corePoolSize);  
  3.    }  

生成一個(gè)可以執(zhí)行時(shí)間調(diào)度的線程池。其實(shí)內(nèi)部使用無(wú)界工作隊(duì)列,線程數(shù)量最多能達(dá)到corePoolSize。

2.5 ExecutorCompletionService

這是個(gè)巧妙的設(shè)計(jì),內(nèi)部維護(hù)了一已經(jīng)完成了任務(wù)結(jié)果隊(duì)列,通過(guò)take方法可以同步地等待一個(gè)個(gè)結(jié)果對(duì)象。

詳情見http://www.oschina.net/uploads/doc/javase-6-doc-api-zh_CN/java/util/concurrent/ExecutorCompletionService.html

#p#

3. java.util.concurrent.locks

java 早期內(nèi)置synchronized關(guān)鍵字解決多線程對(duì)共享資源訪問(wèn)的一些問(wèn)題,和其還配套了Object的notify 和 wait方法,用來(lái)控制線程之間的同步。

concurrent軟件包提供了更為高級(jí)和抽象的Lock工具,能解決更多的問(wèn)題。

Lock是控制多個(gè)線程對(duì)共享資源進(jìn)行訪問(wèn)的工具。通常Lock限定多線程對(duì)同一個(gè)共享資源訪問(wèn)的限制,一次只允許一個(gè)線程獲得Lock,即獲得對(duì)共享資源的訪問(wèn)權(quán)限,線程間是互斥的。但是也有一些鎖如果ReadWriteLock是允許部分線程同時(shí)訪問(wèn)共享資源的。

幾個(gè)術(shù)語(yǔ):

爭(zhēng)用:當(dāng)多個(gè)Thread在同一時(shí)間內(nèi)(相對(duì)概念)想要占有同一個(gè)Lock對(duì)象。那么JVM會(huì)調(diào)度解決爭(zhēng)用。

獲取順序:當(dāng)多個(gè)線程爭(zhēng)用同一個(gè)Lock對(duì)象,那么JVM就要決定哪個(gè)線程將會(huì)獲得鎖權(quán)限。存在兩種模式:公平和不公平。 默認(rèn)都是不公平模式,包括synchronized關(guān)鍵字,jvm決定順序的時(shí)候也是采用不公平策略。因?yàn)楣讲呗孕枰到y(tǒng)記錄大量輔助信息來(lái)判斷分配順序,而不公平策略由JVM決定一直快速高效的算法來(lái)分配Lock。所以不公平策略的系統(tǒng)吞吐量會(huì)比較高(花費(fèi)更少的空間和計(jì)算在分配上),如果沒有特殊需要?jiǎng)t默認(rèn)采用不公平策略。

重入:當(dāng)前線程獲取指定的鎖對(duì)象權(quán)限后,還可以再次獲取該鎖。Lock內(nèi)部會(huì)有一個(gè)計(jì)數(shù)器來(lái)表明當(dāng)前線程獲取了該鎖的數(shù)量。如果一個(gè)線程獲取了一個(gè)鎖兩次,那么線程必須釋放鎖兩次,才能被看作完全釋放了該鎖,所以編程的時(shí)候一定要注意使用重入。synchronized關(guān)鍵字也是支持重入語(yǔ)義的。

3.1 Lock & ReentrantLock

ReentrantLock實(shí)現(xiàn)了Lock接口,一個(gè)可重入(reentrant)的互斥鎖 Lock,它具有與使用 synchronized 方法和語(yǔ)句所訪問(wèn)的隱式監(jiān)視器鎖相同的一些基本行為和語(yǔ)義,但功能更強(qiáng)大。

摘自JavaDoc的一段獲取規(guī)則 “當(dāng)鎖沒有被另一個(gè)線程所擁有時(shí),調(diào)用 lock 的線程將成功獲取該鎖并返回。如果當(dāng)前線程已經(jīng)擁有該鎖,此方法將立即返回。ReentrantLock 將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有?!?/p>

經(jīng)典使用方法。

 
 
 
 
  1. public void m() {   
  2.     lock.lock();  // block until condition holds  
  3.     try {  
  4.       // ... method body  
  5.     } finally {  
  6.       lock.unlock()  
  7.     }  
  8.   }  
  9.  

ReentrantLock除了實(shí)現(xiàn)了Lock規(guī)定的方法外,還實(shí)現(xiàn)了tryLock、isLocked 等方法,幫助你實(shí)現(xiàn)更多的場(chǎng)景。

Condition

和Object的wait和notify方法類似。ReentrantLock對(duì)象附加了Conditon對(duì)象,用來(lái)完成掛起和喚醒操作,使用lock.newCondition() 方法生成。

一個(gè)來(lái)自JKD的例子:

 
 
 
 
  1. class BoundedBuffer {  
  2.   final Lock lock = new ReentrantLock();  
  3.   final Condition notFull  = lock.newCondition();   
  4.   final Condition notEmpty = lock.newCondition();   
  5.  
  6.   final Object[] items = new Object[100];  
  7.   int putptr, takeptr, count;  
  8.  
  9.   public void put(Object x) throws InterruptedException {  
  10.     lock.lock();  
  11.     try {  
  12.       while (count == items.length)   
  13.         notFull.await();  
  14.       items[putptr] = x;   
  15.       if (++putptr == items.length) putptr = 0;  
  16.       ++count;  
  17.       notEmpty.signal();  
  18.     } finally {  
  19.       lock.unlock();  
  20.     }  
  21.   }  
  22.  
  23.   public Object take() throws InterruptedException {  
  24.     lock.lock();  
  25.     try {  
  26.       while (count == 0)   
  27.         notEmpty.await();  
  28.       Object x = items[takeptr];   
  29.       if (++takeptr == items.length) takeptr = 0;  
  30.       --count;  
  31.       notFull.signal();  
  32.       return x;  
  33.     } finally {  
  34.       lock.unlock();  
  35.     }  
  36.   }   
  37. }  

利用Conditon對(duì)象可以讓所有對(duì)同一個(gè)鎖對(duì)象進(jìn)行爭(zhēng)用的Thread之間進(jìn)行同步。

Lock VS synchronized

除非你有明確的需求或者并發(fā)遇到瓶頸的時(shí)候再?zèng)Q定使用ReentrantLock。synchronized在大部分時(shí)候還是可以工作的很好,jvm會(huì)自動(dòng)處理和回收鎖。

ReentrantLock提供了更多的選擇和狀態(tài)信息。和

3.2 ReadWriteLock & ReentrantReadWriteLock

列舉一個(gè)場(chǎng)景對(duì)象X,擁有方法a、b、c。a和b方法不改表X的內(nèi)部狀態(tài),c改變內(nèi)部狀態(tài)。在多線程環(huán)境下,我們要求只讀和寫(變更狀態(tài))是不能同時(shí)進(jìn)行的,而只讀操作是可以同時(shí)并發(fā)的,且實(shí)際運(yùn)行過(guò)程中讀操作數(shù)量遠(yuǎn)遠(yuǎn)大于寫操作的數(shù)量。

如果用synchronized關(guān)鍵字的話,兩個(gè)只讀方法a、b也會(huì)互斥,并發(fā)性能收到限制。

那么這個(gè)情況下ReadWriteLock就非常有用,使用也非常簡(jiǎn)單。

 
 
 
 
  1. class RWDictionary {  
  2.    private final Map  m =  new TreeMap ();  
  3.    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  
  4.    private final Lock r = rwl.readLock();  
  5.    private final Lock w = rwl.writeLock();  
  6.  
  7.    public Data get(String key) {  
  8.        r.lock();  
  9.        try { return m.get(key); }  
  10.        finally { r.unlock(); }  
  11.    }  
  12.    public String[] allKeys() {  
  13.        r.lock();  
  14.        try { return m.keySet().toArray(); }  
  15.        finally { r.unlock(); }  
  16.    }  
  17.    public Data put(String key, Data value) {  
  18.        w.lock();  
  19.        try { return m.put(key, value); }  
  20.        finally { w.unlock(); }  
  21.    }  
  22.    public void clear() {  
  23.        w.lock();  
  24.        try { m.clear(); }  
  25.        finally { w.unlock(); }  
  26.    }  
  27. }  
  28.  

要記得write鎖是獨(dú)占的,它一樣可以使用ReentrantLock的Condition功能。

使用任何的鎖都要通過(guò)try catch 或者 finally 來(lái)處理異常,避免忘記unlock。

#p#

4. 同步輔助類

你提交了一些任務(wù),但你想等它們都完成了再做另外一些事情;你提交了一些任務(wù),但是不想讓它們立刻執(zhí)行,等你喊123開始的時(shí)候,它們才開始執(zhí)行;等等這些場(chǎng)景,線程之間需要相互配合,或者等待某一個(gè)條件成熟執(zhí)行。這些場(chǎng)景想你就需要用到同步輔助類。

4.1 CountDownLatch

CountDownLatch 內(nèi)部有個(gè)計(jì)數(shù)器,通過(guò)構(gòu)造函數(shù)來(lái)指定。這個(gè)類就好比是倒計(jì)時(shí)的電子牌,當(dāng)?shù)褂?jì)時(shí)為0的時(shí)候就可以一起做一些事情。

摘自JavaDoc的方法介紹

摘自JavaDoc的例子

 
 
 
 
  1. class Driver { // ...  
  2.   void main() throws InterruptedException {  
  3.     CountDownLatch startSignal = new CountDownLatch(1);  
  4.     CountDownLatch doneSignal = new CountDownLatch(N);  
  5.  
  6.     for (int i = 0; i < N; ++i) // create and start threads  
  7.       new Thread(new Worker(startSignal, doneSignal)).start();  
  8.  
  9.     doSomethingElse();            // don't let run yet  
  10.     startSignal.countDown();      // let all threads proceed  
  11.     doSomethingElse();  
  12.     doneSignal.await();           // wait for all to finish  
  13.   }  
  14. }  
  15.  
  16. class Worker implements Runnable {  
  17.   private final CountDownLatch startSignal;  
  18.   private final CountDownLatch doneSignal;  
  19.   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  
  20.      this.startSignal = startSignal;  
  21.      this.doneSignal = doneSignal;  
  22.   }  
  23.   public void run() {  
  24.      try {  
  25.        startSignal.await();  
  26.        doWork();  
  27.        doneSignal.countDown();  
  28. } catch (InterruptedException ex) {} // return;  
  29.   }  
  30.  
  31.   void doWork() { ... }  
  32. }  
  33.  

當(dāng)CountDownLatch(1)的時(shí)候,它就好比是個(gè)信號(hào)槍了。

4.2 CyclicBarrier

 
 
 
 
  1. new CyclicBarrier(N,   
  2.             new Runnable() {  
  3.               public void run() {   
  4.                mergeRows(...);   
  5.     }  
  6. }); 

這個(gè)同步輔助類,它讓多個(gè)線程可以在多個(gè)屏障點(diǎn)進(jìn)行等待,所以叫cyclic,而且有個(gè)附加選擇你可以在線程到達(dá)屏障點(diǎn)后執(zhí)行一個(gè)任務(wù)(在釋放其他線程之前)

為了幫助你理解,假設(shè)一個(gè)場(chǎng)景。

有一個(gè)任務(wù),A、B、C分別從三個(gè)倉(cāng)庫(kù)(甲乙丙)搬運(yùn)不同3個(gè)不同的零件到客戶X的公司,然后再一起組裝機(jī)器,完成后一起坐車去公司總部。

這個(gè)任務(wù)需要ABC三個(gè)線程同時(shí)進(jìn)行,但是由于從倉(cāng)庫(kù)到客戶X那邊距離不等、交通狀態(tài)未知的情況下,所花費(fèi)的時(shí)間也不等。同時(shí)由于三個(gè)人負(fù)責(zé)的零件不同,所以安裝機(jī)器的時(shí)候花費(fèi)時(shí)間也不一樣。這個(gè)場(chǎng)景中有兩個(gè)需要線程間等待的地方。CyclicBarrier就可以閃亮登場(chǎng)了。

 
 
 
 
  1. public class Main3 {  
  2.  
  3. public static void main(String[] args) {  
  4.  
  5. CyclicBarrier barrier = new CyclicBarrier(3,new Runnable() {  
  6.  
  7. @Override 
  8.  
  9. public void run() {  
  10.  
  11. System.out.println("到達(dá)公共屏障點(diǎn)");  
  12.  
  13. }  
  14.  
  15. });  
  16.  
  17. ExecutorService es = Executors.newCachedThreadPool();  
  18.  
  19. es.submit(new Worker("A", 5000, 8000, barrier));  
  20.  
  21. es.submit(new Worker("B", 2000, 16000, barrier));  
  22.  
  23. es.submit(new Worker("C", 9000, 2000, barrier));  
  24.  
  25. es.shutdown();  
  26.  
  27. }  
  28.  
  29. static class Worker implements Runnable {  
  30.  
  31. String name;  
  32.  
  33. int t1;// 搬運(yùn)零件所需要的時(shí)間  
  34.  
  35. int t2;// 參與組裝工作需要的時(shí)間  
  36.  
  37. CyclicBarrier barrier;  
  38.  
  39. public Worker(String name, int t1, int t2, CyclicBarrier barrier) {  
  40.  
  41. super();  
  42.  
  43. this.name = name;  
  44.  
  45. this.t1 = t1;  
  46.  
  47. this.t2 = t2;  
  48.  
  49. this.barrier = barrier;  
  50.  
  51. }  
  52.  
  53. @Override 
  54.  
  55. public void run() {  
  56.  
  57. try {  
  58.  
  59. print(name + " 開始搬運(yùn)零件");  
  60.  
  61. Thread.sleep(t1);// 模擬搬運(yùn)時(shí)間  
  62.  
  63. print(name + " 到達(dá)目的地");  
  64.  
  65. int a = barrier.await(); // 等待其他人  
  66.  
  67. if(a==0){  
  68.  
  69. //說(shuō)明是最后一個(gè)到的可以執(zhí)行特殊操作  
  70.  
  71. }  
  72.  
  73. print(name + " 開始組裝機(jī)器");  
  74.  
  75. Thread.sleep(t2);// 模擬組裝時(shí)間.  
  76.  
  77. print(name + " 完成組裝機(jī)器");  
  78.  
  79. barrier.await(); // 等待其他人組裝完畢  
  80.  
  81. print(name + " 一起回總公司");  
  82.  
  83. } catch (Exception e) {  
  84.  
  85. e.printStackTrace();  
  86.  
  87. }  
  88.  
  89. }  
  90.  
  91. }  
  92.  
  93. static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  94.  
  95. static void print(String x) {  
  96.  
  97. System.out.println( sdf.format(new Date()) + ": "+x);  
  98.  
  99. }  
  100.  
  101. }  

4.3 Semaphore

一個(gè)經(jīng)典的信號(hào)量計(jì)數(shù)器。一般被用來(lái)控制對(duì)共享資源同時(shí)訪問(wèn)線程數(shù)量的控制。

特殊情況下信號(hào)量設(shè)置為1,那么就類似互斥鎖的功能。

此類的構(gòu)造方法可選地接受一個(gè)公平 參數(shù)。當(dāng)設(shè)置為 false 時(shí),此類不對(duì)線程獲取鎖的順序做任何保證。和之前提到的爭(zhēng)用獲取順序一樣,在非公平模式下,系統(tǒng)將獲得更好的吞吐量,jvm也會(huì)保證在非公平模式下讓所有線程得到訪問(wèn)機(jī)會(huì)。

【編輯推薦】

  1. java.util.concurrent 您不知道的 5 件事
  2. util.concurrent移植到C#

分享題目:ConcurrentInJava
網(wǎng)站路徑:http://www.5511xx.com/article/dhcehgh.html