日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
并發(fā)編程之Phaser原理與應(yīng)用

 前言

成都創(chuàng)新互聯(lián)公司專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、久治網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5頁面制作商城網(wǎng)站開發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為久治等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

JDK5中引入了CyclicBarrier和CountDownLatch這兩個(gè)并發(fā)控制類,而JDK7中引入的Phaser按照官方的說法是提供了一個(gè)功能類似但是更加靈活的實(shí)現(xiàn)。接下來我們帶著幾個(gè)問題來研究一下Phaser與(CountDownLath、CyclicBarrier)到底有哪些類似,同時(shí)帶來了哪些靈活性?

  1. Phaser 是什么?
  2. Phaser 具有哪些特性?
  3. Phaser相對于 CyclicBarrier 和 CountDownLatch的優(yōu)勢?

CyclicBarrier和CountDownLatch

CyclicBarrier介紹

在使用CyclicBarrier時(shí),需要?jiǎng)?chuàng)建一個(gè)CyclicBarrier對象,構(gòu)造函數(shù)需要一個(gè)整數(shù)作為參數(shù),這個(gè)參數(shù)是一個(gè)“目標(biāo)”,在CyclicBarrier對象創(chuàng)建后,內(nèi)部會(huì)有一個(gè)計(jì)數(shù)器,初始值為0,CyclicBarrier對象的await方法每被調(diào)用一次,這個(gè)計(jì)數(shù)器就會(huì)加1,一旦這個(gè)計(jì)數(shù)器的值達(dá)到設(shè)定的“目標(biāo)”,所有被CyclicBarrier.await阻塞住的線程都會(huì)繼續(xù)執(zhí)行。這個(gè)目標(biāo)是固定的,一旦設(shè)定便不能修改。

舉一個(gè)例子,假設(shè)有5個(gè)人爬香山,他們要爬到山頂,等到5個(gè)人到齊了再同時(shí)出發(fā)下山,那么我們要在山頂設(shè)定一個(gè)“目標(biāo)”,同時(shí)還有一個(gè)計(jì)數(shù)器,這個(gè)目標(biāo)就是5,每一個(gè)人到山頂后,這個(gè)人就要等待,同時(shí)計(jì)數(shù)器加1,等到5個(gè)人到齊了,也就是計(jì)數(shù)器達(dá)到了這個(gè)“目標(biāo)”,所有等待的人就開始下山了。 更多內(nèi)容請閱讀《并發(fā)編程之CyclicBarrier原理與使用》

CountDownLathch介紹

使用CountDownLatch時(shí),需要?jiǎng)?chuàng)建一個(gè)CountDownLatch對象,構(gòu)造函數(shù)也需要一個(gè)整數(shù)作為參數(shù),可以把這個(gè)參數(shù)想象成一個(gè)倒計(jì)時(shí)器,CountDownLatch對象本身是一個(gè)發(fā)令槍,所有調(diào)用CountDownLatch.await方法的線程都會(huì)等待發(fā)令槍的指令,一旦倒計(jì)時(shí)器為0,這些線程同時(shí)開始執(zhí)行,而CountDownLatch.countDown方法就是為倒計(jì)時(shí)器減1。

更多內(nèi)容請閱讀《并發(fā)編程之CountDownLatch原理與使用》

對比分析

CyclicBarrier和CountDownLatch的共同點(diǎn)都是有一個(gè)目標(biāo)和一個(gè)計(jì)數(shù)器,等到計(jì)數(shù)器達(dá)到目標(biāo)后,所有阻塞的線程都將繼續(xù)執(zhí)行。它們的不同點(diǎn)是CyclicBarrier.await在等待的同時(shí)還修改計(jì)數(shù)器,而CountDownLatch.await只負(fù)責(zé)等待,CountDownLatch.countDown才修改計(jì)數(shù)器。

CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點(diǎn)不同:

  • CountDownLatch一般用于一個(gè)或多個(gè)線程,等待其他線程執(zhí)行完任務(wù)后,再才執(zhí)行;
  • CyclicBarrier一般用于一組線程互相等待至某個(gè)狀態(tài),然后這一組線程再同時(shí)執(zhí)行;
  • CountDownLatch 是一次性的,CyclicBarrier 是可循環(huán)利用的;
  • CountDownLathch是一個(gè)計(jì)數(shù)器,線程完成一個(gè)記錄一個(gè),計(jì)數(shù)器遞減,只能用一次。如下圖:

CyclicBarrier的計(jì)數(shù)器更像一個(gè)閥門,需要所有線程都到達(dá),然后繼續(xù)執(zhí)行,計(jì)數(shù)器遞減,提供reset功能,可以多次使用。如下圖:

Phaser是什么?

Phaser,翻譯為移相器(階段),它適用于這樣一種場景,一個(gè)大任務(wù)可以分為多個(gè)階段完成,且每個(gè)階段的任務(wù)可以多個(gè)線程并發(fā)執(zhí)行,但是必須上一個(gè)階段的任務(wù)都完成了才可以執(zhí)行下一個(gè)階段的任務(wù)。

這種場景雖然使用CyclicBarrier 或者 CountDownLatch 也可以實(shí)現(xiàn),但是要復(fù)雜的多,首先,具體需要多少個(gè)階段是可能變的,其次,每個(gè)階段的任務(wù)數(shù)也可能會(huì)變的。相比于CyclicBarrier 和 CountDownLath ,Phaser更加靈活更加方便。

Phaser使用方法

Phaser同時(shí)包含CyclicBarrier和CountDownLatch兩個(gè)類的功能。

  • Phaser的arrive方法將將計(jì)數(shù)器加1,awaitAdvance將線程阻塞,直到計(jì)數(shù)器達(dá)到目標(biāo),這兩個(gè)方法與CountDownLatch的countDown和await方法相對應(yīng);
  • Phaser的arriveAndAwaitAdvance方法將計(jì)數(shù)器加1的同時(shí)將線程阻塞,直到計(jì)數(shù)器達(dá)到目標(biāo)后繼續(xù)執(zhí)行,這個(gè)方法對應(yīng)CyclicBarrier的await方法。

除了包含以上兩個(gè)類的功能外,Phaser還提供了更大的靈活性。CyclicBarrier和CountdownLatch在構(gòu)造函數(shù)指定目標(biāo)后就無法修改,而Phaser提供了register和deregister方法可以對目標(biāo)進(jìn)行動(dòng)態(tài)修改。

下面看一個(gè)最簡單的使用案例:

 
 
 
 
  1. package com.niuh.tools;
  2. import java.util.concurrent.Phaser;
  3. /**
  4.  * 

  5.  * Phaser示例
  6.  * 

  7.  */
  8. public class PhaserRunner {
  9.     // 定義每個(gè)階段需要執(zhí)行3個(gè)小任務(wù)
  10.     public static final int PARTIES = 3;
  11.     // 定義需要4個(gè)階段完成的大任務(wù)
  12.     public static final int PHASES = 4;
  13.     public static void main(String[] args) {
  14.         Phaser phaser = new Phaser(PARTIES) {
  15.             @Override
  16.             protected boolean onAdvance(int phase, int registeredParties) {
  17.                 System.out.println("==phase: " + phase + " finished==");
  18.                 return super.onAdvance(phase, registeredParties);
  19.             }
  20.         };
  21.         for (int i = 0; i < PARTIES; i++) {
  22.             new Thread(() -> {
  23.                 for (int j = 0; j < PHASES; j++) {
  24.                     System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
  25.                     phaser.arriveAndAwaitAdvance();
  26.                 }
  27.             }, "Thread " + i).start();
  28.         }
  29.     }
  30. }

 這里我們定義個(gè)需要4個(gè)階段完成的大任務(wù),每個(gè)階段需要3個(gè)小任務(wù),針對這些小任務(wù),我們分別起3個(gè)線程來執(zhí)行這些小任務(wù),查看輸出結(jié)果為:

  • Thread 2: phase: 0
  • Thread 0: phase: 0
  • Thread 1: phase: 0
  • ==phase: 0 finished==
  • Thread 2: phase: 1
  • Thread 1: phase: 1
  • Thread 0: phase: 1
  • ==phase: 1 finished==
  • Thread 1: phase: 2
  • Thread 2: phase: 2
  • Thread 0: phase: 2
  • ==phase: 2 finished==
  • Thread 1: phase: 3
  • Thread 0: phase: 3
  • Thread 2: phase: 3
  • ==phase: 3 finished==

可以看到,每個(gè)階段都是三個(gè)線程都完成來才進(jìn)入下一個(gè)階段。這是怎么實(shí)現(xiàn)的呢?

Phaser原理猜測

結(jié)合AQS的原理,大概猜測一下Phaser的實(shí)現(xiàn)原理:

  • 首先,需要存儲(chǔ)當(dāng)前階段phase、當(dāng)前階段的任務(wù)數(shù)(參與者)parties、未完成參與者的數(shù)量,這三個(gè)變量我們可以放在一個(gè)變量state中存儲(chǔ)。
  • 其次,需要一個(gè)隊(duì)列存儲(chǔ)先完成的參與者,當(dāng)最后一個(gè)參與者完成任務(wù)時(shí),需要喚醒隊(duì)列中的參與者。

結(jié)合上面的案例帶入:初始時(shí)當(dāng)前階段為0,參與者為3個(gè),未完成參與者數(shù)為3;

  • 第一個(gè)線程執(zhí)行到 phaser.arriveAndAwaitAdvance(); 時(shí)進(jìn)入隊(duì)列;
  • 第二個(gè)線程執(zhí)行到 phaser.arriveAndAwaitadvance(); 時(shí)進(jìn)入隊(duì)列;
  • 第三個(gè)線程執(zhí)行到 phaser.arriveAndAwaitadvance(); 時(shí)先執(zhí)行這個(gè)階段的總結(jié) onAdvance(), 再喚醒簽名兩個(gè)線程繼續(xù)執(zhí)行下一個(gè)階段的任務(wù)。

基于這樣的一個(gè)思路,整體能說的通,至于是不是這樣?讓我們一起來看源碼吧。

Phaser源碼分析

主要API

  1. register(),增加一個(gè)參與者,需要同時(shí)增加parties和unarrived兩個(gè)數(shù)值,也就是state中的16位和低16位
  2. onAdvance(int phase, int registeredParties),當(dāng)前階段所有線程完成時(shí),會(huì)調(diào)用OnAdvance()
  3. bulkRegister(int parties),指定參與者數(shù)目注冊到Phaser中,同時(shí)增加parties和unarrived兩個(gè)數(shù)值
  4. arrive(),作用使parties值加1,并且不在屏障處等待,直接運(yùn)行下面的代碼
  5. awaitAdvance(int phase),如果傳入的參數(shù)與當(dāng)前階段一致,這個(gè)方法會(huì)將當(dāng)前線程置于休眠,直到這個(gè)階段的參與者都完成運(yùn)行。如果傳入的階段參數(shù)與當(dāng)前階段不一致,立即返回
  6. arriveAndAwaitAdvance(),當(dāng)前線程當(dāng)前階段執(zhí)行完畢,等待其它線程完成當(dāng)前階段
  7. arriveAndDeregister(),當(dāng)一個(gè)線程調(diào)用來此方法時(shí),parties將減1,并且通知這個(gè)線程已經(jīng)完成來當(dāng)前預(yù)警,不會(huì)參加到下一個(gè)階段中,因此Phaser對象在開始下一個(gè)階段時(shí)不會(huì)等待這個(gè)線程。
  8. awaitAdvanceInterruptibly(int phase),這個(gè)方法跟awaitAdvance(int phase)一樣,不同之處是,如果這個(gè)方法中休眠的線程被中斷,它將拋出InterruptedException異常。
  9. getPhase(),當(dāng)前階段
  10. getRegisteredParties(),總數(shù)
  11. getArrivedParties(),到達(dá)總數(shù)
  12. getUnarrivedParties(),未到達(dá)總數(shù)

內(nèi)部類QNode

QNode用來跟蹤當(dāng)前線程的信息的。QNode被組織成單向鏈表的形式。用來管理是否阻塞或者被中斷。

QNode繼承自ForkJoinPool.ManagedBlocker。ForkJoinPool來管理是否阻塞和中斷狀態(tài)。這里只需要重寫isReleasable和block。

  • isReleaseable用于判斷是否釋放當(dāng)前節(jié)點(diǎn)。
  • block用于阻塞。
 
 
 
 
  1. static final class QNode implements ForkJoinPool.ManagedBlocker {
  2.         final Phaser phaser;
  3.         final int phase;
  4.         final boolean interruptible;
  5.         final boolean timed;
  6.         boolean wasInterrupted;
  7.         long nanos;
  8.         final long deadline;
  9.         volatile Thread thread; // nulled to cancel wait
  10.         QNode next;
  11.         QNode(Phaser phaser, int phase, boolean interruptible,
  12.               boolean timed, long nanos) {
  13.             this.phaser = phaser;
  14.             this.phase = phase;
  15.             this.interruptible = interruptible;
  16.             this.nanos = nanos;
  17.             this.timed = timed;
  18.             this.deadline = timed ? System.nanoTime() + nanos : 0L;
  19.             thread = Thread.currentThread();
  20.         }
  21.         public boolean isReleasable() {
  22.             if (thread == null)
  23.                 return true;
  24.             if (phaser.getPhase() != phase) {
  25.                 thread = null;
  26.                 return true;
  27.             }
  28.             if (Thread.interrupted())
  29.                 wasInterrupted = true;
  30.             if (wasInterrupted && interruptible) {
  31.                 thread = null;
  32.                 return true;
  33.             }
  34.             if (timed) {
  35.                 if (nanos > 0L) {
  36.                     nanos = deadline - System.nanoTime();
  37.                 }
  38.                 if (nanos <= 0L) {
  39.                     thread = null;
  40.                     return true;
  41.                 }
  42.             }
  43.             return false;
  44.         }
  45.         public boolean block() {
  46.             if (isReleasable())
  47.                 return true;
  48.             else if (!timed)
  49.                 LockSupport.park(this);
  50.             else if (nanos > 0L)
  51.                 LockSupport.parkNanos(this, nanos);
  52.             return isReleasable();
  53.         }
  54.     }

 整體代碼比較簡單。要注意的是在isReleasable中使用了thread=null來使得避免解鎖任務(wù)。使用方法類似于internalAwaitAdvance中的用法。先完成的參與者放入隊(duì)列中的節(jié)點(diǎn),這里我們只需要關(guān)注 thread 和 next兩個(gè)屬性即可,很明顯這是一個(gè)單鏈表,存儲(chǔ)這入隊(duì)的線程。

主要屬性

 
 
 
 
  1. /*
  2.  * unarrived  -- 還沒有抵達(dá)屏障的參與者的個(gè)數(shù) (bits 0-15)
  3.  * parties    -- 需要等待的參與者的個(gè)數(shù)      (bits 16-31)
  4.  * phase      -- 屏障所處的階段             (bits 32-62)
  5.  * terminated -- 屏障的結(jié)束標(biāo)記             (bit 63 / sign)
  6.  */
  7. // 狀態(tài)變量,用于存儲(chǔ)當(dāng)前階段phase、參與者數(shù)parties、未完成的參與者數(shù)unarrived_count 
  8. private volatile long state;
  9. // 最多可以有多少個(gè)參與者,即每個(gè)階段最多有多少個(gè)任務(wù)
  10. private static final int  MAX_PARTIES     = 0xffff;
  11. // 最多可以有多少階段
  12. private static final int  MAX_PHASE       = Integer.MAX_VALUE;
  13. // 參與者數(shù)量的偏移量
  14. private static final int  PARTIES_SHIFT   = 16;
  15. // 當(dāng)前階段的偏移量
  16. private static final int  PHASE_SHIFT     = 32;
  17. // 未完成的參與者數(shù)的掩碼,低16位
  18. private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
  19. // 參與者數(shù),中間16位
  20. private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
  21. // counts的掩碼,counts等于參與者數(shù)和未完成的參與者數(shù)的 '|' 操作
  22. private static final long COUNTS_MASK     = 0xffffffffL;
  23. private static final long TERMINATION_BIT = 1L << 63;
  24. // 一次一個(gè)參與者完成
  25. private static final int  ONE_ARRIVAL     = 1;
  26. // 增加減少參與者時(shí)使用
  27. private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
  28. // 減少參與者時(shí)使用
  29. private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
  30. // 沒有參與者使用
  31. private static final int  EMPTY           = 1;
  32. // 用于求未完成參與者數(shù)量
  33. private static int unarrivedOf(long s) {
  34.  int counts = (int)s;
  35.     return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
  36. }
  37. // 用于求參與者數(shù)量(中間16位),注意int的為止
  38. private static int partiesOf(long s) {
  39.  return (int)s >>> PARTIES_SHIFT;
  40. }
  41. // 用于求階段數(shù)(高32位),注意int的位置
  42. private static int phaseOf(long s) {
  43.  return (int)(s >>> PHASE_SHIFT);
  44. }
  45. // 已完成參與者數(shù)量
  46. private static int arrivedOf(long s) {
  47.  int counts = (int)s;
  48.  return (counts == EMPTY) ? 0 :
  49.   (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
  50. }
  51. /**
  52. * The parent of this phaser, or null if none
  53. */
  54. private final Phaser parent;
  55. /**
  56. * The root of phaser tree. Equals this if not in a tree.
  57. */
  58. private final Phaser root;
  59. // 用于存儲(chǔ)已經(jīng)=完成參與者所在的線程,根據(jù)當(dāng)前階段的奇偶性選擇不同的隊(duì)列
  60. private final AtomicReference evenQ;
  61. private final AtomicReference oddQ;

主要屬性位 state 和 evenQ 及 oddQ

  • state,volatile的long來表示狀態(tài)變量,高32位存儲(chǔ)當(dāng)前階段phase,中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量。
  • unarrived -- 還沒有抵達(dá)屏障的參與者的個(gè)數(shù) (bits 0-15)
  • parties -- 需要等待的參與者的個(gè)數(shù) (bits 16-31)
  • phase -- 屏障所處的階段 (bits 32-62)
  • terminated -- 屏障的結(jié)束標(biāo)記 (bit 63 / sign)

如果是空狀態(tài),也就是沒有子階段注冊的初始階段。這里用一個(gè)EMPTY狀態(tài)表示,也就是0個(gè)子階段和一個(gè)未到達(dá)階段。

所有的狀態(tài)變化都是通過CAS操作執(zhí)行的,唯一例外是注冊一個(gè)子相移器(sub-Phaser),用于構(gòu)成樹的,也就是Phaser的父Phaser非空。這個(gè)子相移器的分階段是通過一個(gè)內(nèi)置鎖來設(shè)置。

  • evenQ 和 oddQ,是根據(jù)phaser的奇偶狀態(tài)來設(shè)置的,用來存儲(chǔ)等待的線程。為了避免競爭,這里使用了Phaser的數(shù)值奇偶來存儲(chǔ),此外對于子相移器,它與其根相移器使用同一個(gè)evenQ或者oddQ,以加速釋放。

構(gòu)造方法

 
 
 
 
  1. public Phaser() {
  2.  this(null, 0);
  3. }
  4. public Phaser(int parties) {
  5.  this(null, parties);
  6. }
  7. public Phaser(Phaser parent) {
  8.  this(parent, 0);
  9. }
  10. public Phaser(Phaser parent, int parties) {
  11.  if (parties >>> PARTIES_SHIFT != 0)
  12.   throw new IllegalArgumentException("Illegal number of parties");
  13.     int phase = 0;
  14.     this.parent = parent;
  15.     if (parent != null) { // 父phaser不為空
  16.      final Phaser root = parent.root;
  17.         this.root = root; // 指向root phaser
  18.         this.evenQ = root.evenQ; // 兩個(gè)棧,整個(gè)phaser鏈只有一份
  19.         this.oddQ = root.oddQ;
  20.         if (parties != 0)
  21.          phase = parent.doRegister(1); // 向父phaser注冊當(dāng)前線程
  22.     }
  23.     else {
  24.      this.root = this; // 否則,自己是root phaser
  25.         this.evenQ = new AtomicReference(); // 負(fù)責(zé)創(chuàng)建兩個(gè)棧(QNode鏈)
  26.         this.oddQ = new AtomicReference();
  27.     }
  28.     // 狀態(tài)變量state的存儲(chǔ)分為三段
  29.     this.state = (parties == 0) ? (long)EMPTY :
  30.             ((long)phase << PHASE_SHIFT) |
  31.             ((long)parties << PARTIES_SHIFT) |
  32.             ((long)parties);
  33. }

 構(gòu)造函數(shù)中還有一個(gè)parent和root,這是用來構(gòu)造多層級階段的,用于構(gòu)成樹的。

重點(diǎn)還是還是看state的賦值方式,高32位存儲(chǔ)當(dāng)前階段phase,中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量。

主要方法

下面我們一起來看看幾個(gè)主要方法的源碼,重點(diǎn)是三個(gè)private的核心方法:doArrive、doRegister、reconcileState

register方法

增加一個(gè)參與者,需要同時(shí)增加parties和unarrived兩個(gè)數(shù)值,也就是state中的16位和低16位(中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量)

 
 
 
 
  1. public int register() {
  2.  return doRegister(1);
  3. }

 這里主要調(diào)用的是doRegister方法,我們往下看。

doRegister方法

 
 
 
 
  1. private int doRegister(int registrations) {
  2.     // adjustment to state
  3.     // state應(yīng)該加的值,注意這里是相當(dāng)于同時(shí)增加parties和unarrived
  4.     long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; //計(jì)算出需要調(diào)整的量
  5.     final Phaser parent = this.parent; //查看可能存在的相移器
  6.     int phase;
  7.     for (;;) {
  8.         // state的值
  9.         long s = (parent == null) ? state : reconcileState(); // reconcileState()方法是調(diào)整當(dāng)前phaser的狀態(tài)與root的一致
  10.         // state的低32未,也就是parties和unarrived的值
  11.         int counts = (int)s;
  12.         // parties的值
  13.         int parties = counts >>> PARTIES_SHIFT;
  14.         // unarrived的值
  15.         int unarrived = counts & UNARRIVED_MASK;
  16.         // 檢查是否溢出
  17.         if (registrations > MAX_PARTIES - parties) //如果需要注冊的數(shù)量超過運(yùn)行注冊的最大值,則拋出異常狀態(tài)異常
  18.             throw new IllegalStateException(badRegister(s));
  19.   // 當(dāng)前階段phase
  20.         phase = (int)(s >>> PHASE_SHIFT);
  21.         if (phase < 0) //如果當(dāng)前狀態(tài)為終止?fàn)顟B(tài)則跳出循環(huán)直接退出
  22.             break;
  23.         // 不是第一個(gè)參與者
  24.         if (counts != EMPTY) {          // not 1st registration //如果當(dāng)前狀態(tài)不是第一次注冊線程
  25.             if (parent == null || reconcileState() == s) { //如果當(dāng)相移器的父相移器為空,則直接信息CAS,如果當(dāng)前相移器部位空則調(diào)用reconcileState處理,這個(gè)稍后再看。reconcileState這里主要為了防止出現(xiàn)同步性錯(cuò)誤。
  26.                 // unarrived等于0說明當(dāng)前階段正在執(zhí)行onAdvance()方法,等待其執(zhí)行完畢
  27.                 if (unarrived == 0)             // wait out advance
  28.                     root.internalAwaitAdvance(phase, null);
  29.                 // 否則就修改state的值,增加adjust,如果成功就跳出循環(huán)
  30.                 else if (UNSAFE.compareAndSwapLong(this, stateOffset,
  31.                                                    s, s + adjust))
  32.                     break;
  33.             }
  34.         }
  35.         // 是第一個(gè)參與者,當(dāng)前狀態(tài)是第一次注冊。如果如果當(dāng)前相移器沒有父相移器。則直接進(jìn)行CAS
  36.         else if (parent == null) {          // 1st root registration
  37.             // 計(jì)算state的值
  38.             long next = ((long)phase << PHASE_SHIFT) | adjust;
  39.             // 修改state的值,如果成功就跳出循環(huán)
  40.             if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
  41.              break;
  42.             }
  43.         else { // 如果當(dāng)前是第一次設(shè)置,并且該相移器被組織在一個(gè)樹中則需要考慮一下,則需要使用內(nèi)置鎖來進(jìn)如
  44.             // 多層級階段的處理方式
  45.             synchronized (this) {               // 1st sub registration
  46.                 if (state == s) {               // recheck under lock 這里有可能發(fā)生競爭。所以這里還需要檢查一下,如果失敗則需退出同步區(qū)重新嘗試進(jìn)入。
  47.                     phase = parent.doRegister(1); // 調(diào)用其父相移器的注冊方法
  48.                     if (phase < 0)
  49.                         break;
  50.                     // finish registration whenever parent registration
  51.                     // succeeded, even when racing with termination,
  52.                     // since these are part of the same "transaction".
  53.                     while (!UNSAFE.compareAndSwapLong
  54.                            (this, stateOffset, s,
  55.                             ((long)phase << PHASE_SHIFT) | adjust)) {
  56.                         s = state;
  57.                         phase = (int)(root.state >>> PHASE_SHIFT);
  58.                         // assert (int)s == EMPTY;
  59.                     }
  60.                     break;
  61.                 }
  62.             }
  63.         }
  64.     }
  65.     return phase;
  66. }

 增加一個(gè)參與者的總體的邏輯為:

  • 增加一個(gè)參與者,需要同時(shí)增加parties和unarrived兩個(gè)數(shù)值,也就是state中的16位和低16位;
  • 如果是第一個(gè)參與者,則嘗試原子更新state的值,如果成功了就退出;
  • 如果不是第一個(gè)參與者,則檢查是不是在執(zhí)行onAdvance() , 如果是等待onAdvance() 執(zhí)行完成,如果否則嘗試原子更新state的值,直到成功退出;
  • 等待onAdvance() 完成是采用先自旋后進(jìn)入隊(duì)列排隊(duì)的方式等待,減少線程上下文切換;

arriveAndAwaitAdvance()方法

當(dāng)前線程當(dāng)前階段執(zhí)行完畢,等待其他線程完成當(dāng)前階段。 如果當(dāng)前線程是該階段最后一個(gè)到達(dá)的,則當(dāng)前線程會(huì)執(zhí)行onAdvance()方法,并喚醒其它線程進(jìn)入下一個(gè)階段。

 
 
 
 
  1. public int arriveAndAwaitAdvance() {
  2.      // Specialization of doArrive+awaitAdvance eliminating some reads/paths
  3.      final Phaser root = this.root;
  4.      for (;;) {
  5.          // state的值
  6.          long s = (root == this) ? state : reconcileState();
  7.          // 當(dāng)前階段
  8.          int phase = (int)(s >>> PHASE_SHIFT);
  9.          if (phase < 0)
  10.              return phase;
  11.          // parties 和 unarrived的值
  12.          int counts = (int)s;
  13.          // unarrived的值(state的低16位)
  14.          int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
  15.          if (unarrived <= 0)
  16.              throw new IllegalStateException(badArrive(s));
  17.          // 修改state的值
  18.          if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
  19.                                        s -= ONE_ARRIVAL)) {
  20.              // 如果不是最后一個(gè)到達(dá)的,則調(diào)用internalAwaitAdvance()方法自旋或進(jìn)入隊(duì)列等待
  21.              if (unarrived > 1)
  22.                  // 這里是直接返回了,internalAwaitAdvance()方法的源碼見register()方法解析
  23.                  return root.internalAwaitAdvance(phase, null);
  24.              // 到這里說明是最后一個(gè)到達(dá)的參與者
  25.              if (root != this)
  26.                  return parent.arriveAndAwaitAdvance();
  27.              // n 只保留了state中parties的部分,也就是中16位
  28.              long n = s & PARTIES_MASK;  // base of next state
  29.              // parties的值,即下一次需要到達(dá)的參與者數(shù)量
  30.              int nextUnarrived = (int)n >>> PARTIES_SHIFT;
  31.              // 執(zhí)行onAdvance()方法,返回true表示下一階段參與者數(shù)量為0了,也就是結(jié)束了
  32.              if (onAdvance(phase, nextUnarrived))
  33.                  n |= TERMINATION_BIT;
  34.              else if (nextUnarrived == 0)
  35.                  n |= EMPTY;
  36.              else
  37.                  n |= nextUnarrived; // n加上unarrived的值
  38.              // 下階段等待當(dāng)前階段加1
  39.              int nextPhase = (phase + 1) & MAX_PHASE;
  40.              // n 加上下一個(gè)階段的值
  41.              n |= (long)nextPhase << PHASE_SHIFT;
  42.              // 修改state的值為n
  43.              if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
  44.                  return (int)(state >>> PHASE_SHIFT); // terminated
  45.              // 喚醒其它參與者并進(jìn)入下一個(gè)階段
  46.              releaseWaiters(phase);
  47.              // 返回下一階段的值
  48.              return nextPhase;
  49.          }
  50.      }
  51.  }

 arriveAndAwaitAdvance的大致邏輯為:

  • 修改state中unarrived部分的值減1;
  • 如果不是最后一個(gè)到達(dá),則調(diào)用internalAwaitAdvance() 方法自旋或排隊(duì)等待;
  • 如果是最后一個(gè)到達(dá)的,則調(diào)用onAdvance() 方法,然后修改state的值為下一階段對應(yīng)的值,并喚醒其它等待的線程;
  • 返回下一階段俄值。

internalAwaitAdvance方法

internalAwaitAdvance方法。實(shí)際上Phaser中阻塞都是通過這個(gè)語句實(shí)現(xiàn)的。這個(gè)語句必須通過根相移器調(diào)用。換句話說所有的阻塞都是在根相移器阻塞的。

輸入?yún)?shù)中phase是需要阻塞的階段。node是用來跟蹤可能中斷的阻塞節(jié)點(diǎn)。

 
 
 
 
  1. // 等待onAdvance()方法執(zhí)行完畢
  2. // 原理是先自旋一定次數(shù),如果進(jìn)入下一個(gè)階段,這個(gè)方法直接返回了,
  3. // 如果自旋一定次數(shù)還沒有進(jìn)入下一個(gè)階段,則當(dāng)前線程入隊(duì)列,等待onAdvance()執(zhí)行完成喚醒
  4. private int internalAwaitAdvance(int phase, QNode node) {
  5.     // assert root == this;
  6.     // 保證隊(duì)列為空
  7.     releaseWaiters(phase-1);      // ensure old queue clean
  8.     boolean queued = false;       // true when node is enqueued
  9.     int lastUnarrived = 0;        // to increase spins upon change
  10.     // 自旋的次數(shù)
  11.     int spins = SPINS_PER_ARRIVAL;
  12.     long s;
  13.     int p;
  14.     // 檢查當(dāng)前階段是否變化,如果變化了說明進(jìn)入下一個(gè)階段了,這時(shí)候就沒有必要自旋了
  15.     while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
  16.         // 如果node為空,注冊的時(shí)候傳入的為空
  17.         if (node == null) {           // spinning in noninterruptible mode
  18.             // 未完成的參與者數(shù)量
  19.             int unarrived = (int)s & UNARRIVED_MASK;
  20.             // unarrived 有變化,增加自旋次數(shù)
  21.             if (unarrived != lastUnarrived &&
  22.                 (lastUnarrived = unarrived) < NCPU)
  23.                 spins += SPINS_PER_ARRIVAL;
  24.             boolean interrupted = Thread.interrupted();
  25.             // 自旋次數(shù)萬了,則新建一個(gè)節(jié)點(diǎn)
  26.             if (interrupted || --spins < 0) { // need node to record intr
  27.                 node = new QNode(this, phase, false, false, 0L);
  28.                 node.wasInterrupted = interrupted; 
    網(wǎng)站題目:并發(fā)編程之Phaser原理與應(yīng)用
    地址分享:http://www.5511xx.com/article/dpdeidh.html