新聞中心
今天這篇文章,我們繼續(xù)講架構(gòu)師大劉的故事。

大劉有段時(shí)間經(jīng)常會(huì)給一些程序員講課。這一方面是由于團(tuán)隊(duì)培訓(xùn)的需要,一方面也是大劉自身想搞搞凡爾賽,嘚瑟一下自身的實(shí)力。
大劉講課是允許公司任何一個(gè)人進(jìn)去聽的。提前一個(gè)星期把主題公布在公司群里,有人想聽到日子直接去就是了。
有一次,大劉在聊并發(fā)話題的時(shí)候,為了彰顯自己確實(shí)是個(gè)并發(fā)達(dá)人,用了個(gè) SynchronousQueue 舉例子。他說這個(gè)隊(duì)列其實(shí)沒有容積的概念,就是線程持有數(shù)據(jù)互相匹配。
嗯,談到這里還是要說一下,大劉其實(shí)也不太懂 SynchronousQueue。只是一來這東西沒人用,自然就沒人懂;二來它的概念也比較晦澀,有些時(shí)候比較違背直覺,所以,即使隨口說的一些話可能不太對,也未必會(huì)被發(fā)現(xiàn),還能給人一種不明覺厲的感覺。
大劉用過幾次,感覺良好。因此沒事兒就要秀一下 SynchronousQueue,表示自己這么生僻的也懂,并發(fā)達(dá)人的名頭是沒有叫錯(cuò)的。
也就那一次,恰恰被人拆了臺。
當(dāng)時(shí)課上來了個(gè)新入職的技術(shù),此人長得中等身材,相貌平平,只是臉卻長的像種地多年的老農(nóng)的巴掌。臉上的疙瘩如同老農(nóng)巴掌上的老繭。這人姓張,這里由于他臉長得像個(gè)大巴掌,那就暫且叫他巴掌張。
這個(gè)巴掌張打斷了大劉的話,言之鑿鑿說大劉說的是錯(cuò)的,說他看過這個(gè) SynchronousQueue,并不是大劉說的這樣。
大劉有點(diǎn)心虛,脖子滲出了一圈汗,但是并發(fā)達(dá)人的稱呼大劉并不想丟掉。于是說了一大堆云里霧里的廢話,把話題帶偏了開去。并告訴巴掌張,下回要和他在這個(gè)舞臺上 PK 一二, 要好好看看誰是真正的 SynchronousQueue 的知心朋友。
由于大劉感覺被巴掌張的巴掌糊了臉,便就此下了決心要研究透 SynchronousQueue。
Google 和百度一起查,東西合璧,洋為中用,搞了好是一陣子。最后有個(gè)犄角旮旯的小破網(wǎng)站,有人說了這么一句話:
SynchronousQueue 的目的就是為了接頭,為了匹配,當(dāng)接上頭了就雙方合作愉快,整個(gè)工作完成。但是一旦在接頭中,任何一方還沒到達(dá),那么另一方就必須阻塞著等待。
這句話一下子就敲開了大劉的腦殼,讓聰明的智商重新占領(lǐng)了高地。
為啥這句話就點(diǎn)亮了大劉那本來已經(jīng)像燈泡的腦袋了呢?因?yàn)榇髣⑾肫鹆怂看蔚拿嬖嚱?jīng)歷,就和這個(gè)接頭是一樣的。
大劉每次去面試,都很規(guī)矩的提前趕到新公司。但是大部分情況,時(shí)間到了之后都需要等很長時(shí)間才開始面試。大劉那時(shí)候也年輕,只是以為領(lǐng)導(dǎo)忙,所以倒也恭恭敬敬的等著。
直到大劉自己當(dāng)了領(lǐng)導(dǎo),去面試別人的時(shí)候,被 HR 委婉的提醒了下,要讓候選人等一會(huì)兒再過去,顯的公司業(yè)務(wù)很忙,讓候選人對公司保持一定的敬畏。那時(shí)候,大劉才知道這是一種 PUA 術(shù)……
大劉對照著自己的面試經(jīng)歷,一下就理解了 SynchronousQueue 的概念。
SynchronousQueue 本身是為了交接、匹配而存在的。當(dāng)一個(gè)線程往 SynchronousQueue 放東西,發(fā)現(xiàn)沒線程在等著拿,就給阻塞掉——這就像面試者來早了等面試官。
當(dāng)一個(gè)線程去 SynchronousQueue 拿東西,發(fā)現(xiàn)沒東西,就去等的時(shí)候——就像面試官來早了等面試者。
搞懂 SynchronousQueue 的時(shí)候,正是一個(gè)冬天,屋外面的寒風(fēng)在虎虎生威,屋里面的大劉在熠熠生輝。
只是一個(gè)堂而皇之?dāng)[在 JDK 底層并發(fā)包中的隊(duì)列結(jié)構(gòu),SynchronousQueue 當(dāng)然沒那么簡單,里面還存在著億點(diǎn)點(diǎn)細(xì)節(jié)。
所以,大劉在整體方向搞懂之后,開始研究起了細(xì)節(jié)。他要奮發(fā),狠狠把巴掌張的囂張氣焰壓下去,大劉要當(dāng)公司技術(shù)的頭牌。
回到現(xiàn)實(shí)里,SynchronousQueue 真正的目的就是為了讓兩個(gè)線程的工作結(jié)果進(jìn)行交接。這沒什么問題。但是,在這個(gè)交接中是需要嚴(yán)格保密的,沒有人可以窺視。
嗯,沒錯(cuò),就和你約了女朋友去鐘點(diǎn)房那樣的不能被窺視。
好,圍繞這個(gè) SynchronousQueue 的鐘點(diǎn)房,咱們通過源代碼,來看這億點(diǎn)點(diǎn)細(xì)節(jié)。
首先,鐘點(diǎn)房嚴(yán)格保密,里面是多少人,就不能讓人知道。所以,就不能讓別人通過方法得到具體的數(shù)據(jù)。對于 SynchronousQueue 來說,自然就是通過 size() 你得不到什么信息。
- /**
- * Always returns zero.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @return zero
- */
- public int size() {
- return 0;
- }
- /**
- * Always returns {@code true}.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @return {@code true}
- */
- public boolean isEmpty() {
- return true;
- }
其次,鐘點(diǎn)房也不能隨便進(jìn)去查房,看看都是誰。所以,自然就不能迭代。
- /**
- * Returns an empty iterator in which {@code hasNext} always returns
- * {@code false}.
- *
- * @return an empty iterator
- */
- public Iterator
iterator() { - return Collections.emptyIterator();
- }
再次,鐘點(diǎn)房保護(hù)隱私,它也不能讓你鉆了漏子,不告訴你 XXX 是不是躲在了鐘點(diǎn)房里。所以,你也不能知道鐘點(diǎn)房里有沒有這個(gè)人。
- /**
- * Always returns {@code false}.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @param o the element
- * @return {@code false}
- */
- public boolean contains(Object o) {
- return false;
- }
- /**
- * Returns {@code false} unless the given collection is empty.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @param c the collection
- * @return {@code false} unless given collection is empty
- */
- public boolean containsAll(Collection> c) {
- return c.isEmpty();
- }
自然,鐘點(diǎn)房也沒什么權(quán)力趕人出去。
- /**
- * Always returns {@code false}.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @param o the element to remove
- * @return {@code false}
- */
- public boolean remove(Object o) {
- return false;
- }
當(dāng)然,作為一個(gè)商業(yè)化的鐘點(diǎn)房,SynchronousQueue 還是很注意安全的,它貼心的提供了緊急轉(zhuǎn)移的手段。
- /**
- * @throws UnsupportedOperationException {@inheritDoc}
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException {@inheritDoc}
- */
- public int drainTo(Collection super E> c) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- int n = 0;
- for (E e; (e = poll()) != null;) {
- c.add(e);
- ++n;
- }
- return n;
- }
- /**
- * @throws UnsupportedOperationException {@inheritDoc}
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException {@inheritDoc}
- */
- public int drainTo(Collection super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- int n = 0;
- for (E e; n < maxElements && (e = poll()) != null;) {
- c.add(e);
- ++n;
- }
- return n;
- }
最后,鐘點(diǎn)房就只能搞搞交接工作了。交接嗎,自然是有交有接的,交的就得帶東西。
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- // put:帶著東西進(jìn)屋子
- if (transferer.transfer(e, false, 0) == null) {
- Thread.interrupted();
- throw new InterruptedException();
- }
- }
接的肯定不會(huì)帶著東西,得留地方拿東西。
- public E take() throws InterruptedException {
- // take:從屋子里把東西拿出來
- E e = transferer.transfer(null, false, 0);
- if (e != null)
- return e;
- Thread.interrupted();
- throw new InterruptedException();
- }
但是呢,這交接工作啊,得在專人安排下進(jìn)行。
為什么需要專人來幫忙?因?yàn)橛袝r(shí)候我們的鐘點(diǎn)房太受歡迎了,客人多,得排隊(duì)管管。管這些排隊(duì)的就是 Transfer,它是鐘點(diǎn)房的經(jīng)理。
- /**
- * The transferer. Set only in constructor, but cannot be declared
- * as final without further complicating serialization. Since
- * this is accessed only at most once per public method, there
- * isn't a noticeable performance penalty for using volatile
- * instead of final here.
- */
- private transient volatile Transferer
transferer; - /**
- * Shared internal API for dual stacks and queues.
- */
- abstract static class Transferer
{ - /**
- * Performs a put or take.
- *
- * @param e if non-null, the item to be handed to a consumer;
- * if null, requests that transfer return an item
- * offered by producer.
- * @param timed if this operation should timeout
- * @param nanos the timeout, in nanoseconds
- * @return if non-null, the item provided or received; if null,
- * the operation failed due to timeout or interrupt --
- * the caller can distinguish which of these occurred
- * by checking Thread.interrupted.
- */
- abstract E transfer(E e, boolean timed, long nanos);
- }
Transfer 經(jīng)理每次開門營業(yè)的時(shí)候,會(huì)收到總部給的牌子,告訴他管理工作要注意方式方法,比如公平有效,比如優(yōu)先服務(wù) VIP 客人之類的。
- /**
- * 默認(rèn)給vip客人開點(diǎn)后門
- */
- public SynchronousQueue() {
- this(false);
- }
- /**
- * 總部遞牌子,告訴Transfer到底是公平還是不公平,
- */
- public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue
() : new TransferStack (); - }
先看看適合勞苦大眾的公平模式,先來先享受,晚來沒折扣。
- static final class TransferQueue
extends Transferer { - static final class QNode{...}
- transient volatile QNode head;
- transient volatile QNode tail;
- transient volatile QNode cleanMe;
- TransferQueue() {
- //經(jīng)典的鏈表套路,先搞個(gè)虛擬的頭結(jié)點(diǎn)
- QNode h = new QNode(null, false);
- head = h;
- tail = h;
- }
- ……
- ……
QNode 就是 Transfer 經(jīng)理需要的牌子,上面記錄點(diǎn)信息,別到時(shí)候弄錯(cuò)了。
- static final class QNode {
- volatile QNode next; // 下一個(gè)排隊(duì)的哥們兒
- volatile Object item; // 這次哥們帶來的要交接的東西
- volatile Thread waiter; // 交接的線程
- final boolean isData; // isData == true表示帶著東西
- QNode(Object item, boolean isData) {
- this.item = item;
- this.isData = isData;
- }
- // ...省略一系列CAS方法
- }
怎么搞,秘密都在 transfer() 里。
- @SuppressWarnings("unchecked")
- E transfer(E e, boolean timed, long nanos) {
- //...先省略細(xì)節(jié)
- }
transfer 本質(zhì)就是一直在等待交接完成或者交接被中斷,被取消,或者等待超時(shí)。
- for (;;) {
- QNode t = tail;
- QNode h = head;
- //因?yàn)槌跏蓟窃跇?gòu)造函數(shù)里搞得,可能構(gòu)造函數(shù)沒有執(zhí)行完,就被用上了,就會(huì)出現(xiàn)t或者h(yuǎn)為null的情況
- if (t == null || h == null)
- continue; //啥也不能做
- //h==t表示沒人,t.isData == isData表示過來的哥們和前面的哥們目的一樣,那就只能考慮排隊(duì)等著了。
- if (h == t || t.isData == isData) {
- QNode tn = t.next;
- //線程不安全需要考慮的,現(xiàn)在的尾巴不對,指錯(cuò)了,重新確認(rèn)下
- if (t != tail)
- continue;
- //隊(duì)尾確定了,發(fā)現(xiàn)又來了人,把尾巴指向新來的人
- if (tn != null) {
- advanceTail(t, tn);
- continue;
- }
- //超時(shí)了,別等了
- if (timed && nanos <= 0)
- return null;
- //總算沒事兒了,哥們可以登記進(jìn)屋了
- if (s == null)
- s = new QNode(e, isData);
- //中間可能有人插隊(duì),只能再等等
- if (!t.casNext(null, s))
- continue;
- //準(zhǔn)備進(jìn)屋等著約的人
- advanceTail(t, s);
- Object x = awaitFulfill(s, e, timed, nanos);
- //同一個(gè)人出來,那就是任務(wù)失敗了
- if (x == s) {
- //清理下
- clean(t, s);
- return null;
- }
- if (!s.isOffList()) { //還沒脫隊(duì)
- advanceHead(t, s); //排前面單獨(dú)處理
- if (x != null) //交接成功設(shè)一下標(biāo)記
- s.item = s;
- s.waiter = null;
- }
- return (x != null) ? (E)x : e;
這段是不是看著很頭痛?其實(shí) Transfer 這小子也頭痛。
它首先要面臨的第一個(gè)問題:資源競爭的問題。
客人源源不斷的來,由于 Transfer 強(qiáng)迫癥,他想每次必須從絕對的隊(duì)頭或者隊(duì)尾巴開始,所以,每次都要判斷下,到底他看到的隊(duì)頭或者隊(duì)尾,是不是真正的隊(duì)頭、隊(duì)尾。
確定沒問題了,新來的客人就開始被打造成真正的隊(duì)尾。
然后,成為隊(duì)尾的哥們就可以等著屬于自己的 Mr.Right 過來交接了。等著交接一直到成功或者失敗的方法就是 awaitFulfill(t, tn)。
這邊有人在等待,同時(shí)另外一邊,交接的人們也開始陸續(xù)過來了。
- else { // complementary-mode
- QNode m = h.next; // node to fulfill
- if (t != tail || m == null || h != head)
- continue; // inconsistent read
- Object x = m.item;
- if (isData == (x != null) || // m already fulfilled
- x == m || // m cancelled
- !m.casItem(x, e)) { // 交接的核心語句
- advanceHead(h, m); // dequeue and retry
- continue;
- }
- advanceHead(h, m); // successfully fulfilled
- LockSupport.unpark(m.waiter);
- return (x != null) ? (E)x : e;
- }
交接最核心的其實(shí)就是 m.casItem(x, e)。交接成功,大家各回各家了。
整體的流程如下:
開始就是個(gè)經(jīng)典鏈表開局,head = tail
陸續(xù)開始有節(jié)點(diǎn)鏈接,put 的時(shí)候,isData = true;take 的時(shí)候,isData = false
可能會(huì)同時(shí)有很多的 put 操作,沒有對應(yīng)的 take 操作,他們就按照次序一個(gè)個(gè)鏈接起來,形成鏈表,并通過 awaitFulfill 方法等著對應(yīng)的 take
也可能同時(shí)會(huì)有很多的 take 操作,而沒有對應(yīng)的 put 操作,會(huì)形成鏈表,并通過 awaitFulfill 方法等著對應(yīng)的 put
take 操作會(huì)從鏈表頭開始找匹配的 put,然后通過 casItem 方法交接
put 操作會(huì)從鏈表頭開始找匹配的 take,然后通過 casItem 方法交接
所以,SynchronousQueue 你可以看到了,專門就是搞交接任務(wù)。
- put 的哥們發(fā)現(xiàn)沒人 take,就等在那里,等著take操作。
- take的哥們兒發(fā)現(xiàn)沒人put,也會(huì)等在那里,等著put操作。
這就是我們的 SynchronousQueue 鐘點(diǎn)房做的事情。
OK,鐘點(diǎn)房既然開門做生意,它也要賺錢的嘛。所以,它還得搞搞 VIP 客戶收費(fèi),也得為 VIP 客戶搞一些優(yōu)待。
對于這些 VIP 客人,我們的 Transfer 經(jīng)理會(huì)特意安排下,以棧的形式來安排客人,越后來的客人越大牌兒。所以,自然是后來的客人會(huì)優(yōu)先搞定交接了。這里簡短的介紹下,就不再贅述了。
Transfer 化身成 TransferStack,后來的優(yōu)先服務(wù)。
開始自然是鏈表開局,一個(gè)無意義的鏈表頭指向了 null
發(fā)現(xiàn)鏈表是空了,二話不說,客官,您進(jìn)來先啦
和 TransferQueue 一樣,如果都是 take 過來,模式就是 REQUEST,就得排隊(duì)了
交接人出現(xiàn),哥們可以收攤兒了
其余的不說了,一樣的,說多了沒勁
話說,大劉搞清楚了這些細(xì)節(jié)之后,次日,當(dāng)巴掌張?jiān)俅芜M(jìn)行挑釁時(shí),大劉徹底穩(wěn)下來了。
當(dāng)挨個(gè)把細(xì)節(jié)講的一清二楚之后,看著巴掌張那張落寞的巴掌臉,瞬間也不覺得像巴掌了,而是像是在猜拳中出的石頭剪刀布中的布。大劉沒忍住,對著這個(gè)布比劃出了個(gè)剪刀,光榮的結(jié)束了戰(zhàn)斗。
大劉依然在技術(shù)流中獨(dú)占鰲頭。
我們下篇大劉的故事見。
本文轉(zhuǎn)載自微信公眾號「四猿外」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系四猿外公眾號
新聞名稱:面試官:啥?SynchronousQueue是鐘?點(diǎn)?房?
本文鏈接:http://www.5511xx.com/article/dpogdje.html


咨詢
建站咨詢
