日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
JavaConcurrentHashMap高并發(fā)安全實(shí)現(xiàn)原理解析

 一、概述

ConcurrentHashMap (以下簡(jiǎn)稱C13Map) 是并發(fā)編程出場(chǎng)率最高的數(shù)據(jù)結(jié)構(gòu)之一,大量的并發(fā)CASE背后都有C13Map的支持,同時(shí)也是JUC包中代碼量最大的組件(6000多行),自JDK8開始Oracle對(duì)其進(jìn)行了大量?jī)?yōu)化工作。

本文從 HashMap 的基礎(chǔ)知識(shí)開始,嘗試逐一分析C13Map中各個(gè)組件的實(shí)現(xiàn)和安全性保證。

二、HashMap基礎(chǔ)知識(shí) 

分析C13MAP前,需要了解以下的HashMap知識(shí)或者約定:

  •  哈希表的長(zhǎng)度永遠(yuǎn)都是2的冪次方,原因是hashcode%tableSize==hashcode&(tableSize-1),也就是哈希槽位的確定可以用一次與運(yùn)算來替代取余運(yùn)算。
  •  會(huì)對(duì)hashcode調(diào)用若干次擾動(dòng)函數(shù),將高16位與低16位做異或運(yùn)算,因?yàn)楦?6位的隨機(jī)性更強(qiáng)。
  •  當(dāng)表中的元素總數(shù)超過tableSize * 0.75時(shí),哈希表會(huì)發(fā)生擴(kuò)容操作,每次擴(kuò)容的tableSize是原先的兩倍。
  •  下文提到的槽位(bucket)、哈希分桶、BIN均表示同一個(gè)概念,即哈希table上的某一列。
  •  舊表在做搬運(yùn)時(shí)i槽位的node可以根據(jù)其哈希值的第tableSize位的bit決定在新表上的槽位是i還是i+tableSize。
  •  每個(gè)槽位上有可能會(huì)出現(xiàn)哈希沖突,在未達(dá)到某個(gè)閾值時(shí)它是一個(gè)鏈表結(jié)構(gòu),達(dá)到閾值后會(huì)升級(jí)到紅黑樹結(jié)構(gòu)。
  •  HashMap本身并非為多線程環(huán)境設(shè)計(jì),永遠(yuǎn)不要嘗試在并發(fā)環(huán)境下直接使用HashMap,C13Map不存在這個(gè)安全問題。

三、C13Map的字段定義

C13Map的字段定義 

 
 
 
 
  1. //最大容量  
  2. private static final int MAXIMUM_CAPACITY = 1 << 30;   
  3. //默認(rèn)初始容量  
  4. private static final int DEFAULT_CAPACITY = 16;  
  5. //數(shù)組的最大容量,防止拋出OOM  
  6. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;   
  7. //最大并行度,僅用于兼容JDK1.7以前版本  
  8. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;  
  9. //擴(kuò)容因子  
  10. private static final float LOAD_FACTOR = 0.75f;  
  11. //鏈表轉(zhuǎn)紅黑樹的閾值  
  12. static final int TREEIFY_THRESHOLD = 8;  
  13. //紅黑樹退化閾值  
  14. static final int UNTREEIFY_THRESHOLD = 6;  
  15. //鏈表轉(zhuǎn)紅黑樹的最小總量  
  16. static final int MIN_TREEIFY_CAPACITY = 64;  
  17. //擴(kuò)容搬運(yùn)時(shí)批量搬運(yùn)的最小槽位數(shù)  
  18. private static final int MIN_TRANSFER_STRIDE = 16;  
  19. //當(dāng)前待擴(kuò)容table的郵戳位,通常是高16位  
  20. private static final int RESIZE_STAMP_BITS = 16;  
  21. //同時(shí)搬運(yùn)的線程數(shù)自增的最大值  
  22. private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;  
  23. //搬運(yùn)線程數(shù)的標(biāo)識(shí)位,通常是低16位  
  24. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;  
  25. static final int MOVED     = -1; // 說明是forwardingNode  
  26. static final int TREEBIN   = -2; // 紅黑樹  
  27. static final int RESERVED  = -3; // 原子計(jì)算的占位Node  
  28. static final int HASH_BITS = 0x7fffffff; // 保證hashcode擾動(dòng)計(jì)算結(jié)果為正數(shù)  
  29. //當(dāng)前哈希表  
  30. transient volatile Node[] table;  
  31. //下一個(gè)哈希表  
  32. private transient volatile Node[] nextTable;  
  33. //計(jì)數(shù)的基準(zhǔn)值  
  34. private transient volatile long baseCount;  
  35. //控制變量,不同場(chǎng)景有不同用途,參考下文  
  36. private transient volatile int sizeCtl;  
  37. //并發(fā)搬運(yùn)過程中CAS獲取區(qū)段的下限值  
  38. private transient volatile int transferIndex;  
  39. //計(jì)數(shù)cell初始化或者擴(kuò)容時(shí)基于此字段使用自旋鎖  
  40. private transient volatile int cellsBusy;  
  41. //加速多核CPU計(jì)數(shù)的cell數(shù)組  
  42. private transient volatile CounterCell[] counterCells; 

四、安全操作Node數(shù)組 

 
 
 
 
  1. static final  Node tabAt(Node[] tab, int i) {  
  2.     return (Node)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);  
  3. }  
  4. static final  boolean casTabAt(Node[] tab, int i,  
  5.                                     Node c, Node v) {  
  6.     return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);  
  7. }  
  8. static final  void setTabAt(Node[] tab, int i, Node v) {  
  9.     U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v); 

對(duì)Node[] 上任意一個(gè)index的讀取和寫入都使用了Unsafe輔助類,table本身是volatile類型的并不能保證其下的每個(gè)元素的內(nèi)存語義也是volatile類型;

需要借助于Unsafe來保證Node[]元素的“讀/寫/CAS”操作在多核并發(fā)環(huán)境下的原子或者可見性。

五、讀操作get為什么是線程安全的

首先需要明確的是,C13Map的讀操作一般是不加鎖的(TreeBin的讀寫鎖除外),而讀操作與寫操作有可能并行;可以保證的是,因?yàn)镃13Map的寫操作都要獲取bin頭部的syncronized互斥鎖,能保證最多只有一個(gè)線程在做更新,這其實(shí)是一個(gè)單線程寫、多線程讀的并發(fā)安全性的問題。

C13Map的get方法 

 
 
 
 
  1. public V get(Object key) {  
  2.     Node[] tab; Node e, p; int n, eh; K ek;  
  3.     //執(zhí)行擾動(dòng)函數(shù)  
  4.     int h = spread(key.hashCode());  
  5.     if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {  
  6.         if ((eeh = e.hash) == h) {  
  7.             if ((eek = e.key) == key || (ek != null && key.equals(ek)))  
  8.                 return e.val;  
  9.         } 
  10.          else if (eh < 0)  
  11.             return (p = e.find(h, key)) != null ? p.val : null;  
  12.         while ((ee = e.next) != null) {  
  13.             if (e.hash == h &&  
  14.                 ((eek = e.key) == key || (ek != null && key.equals(ek))))  
  15.                 return e.val; 
  16.          }  
  17.     }  
  18.     return null;  

1、如果當(dāng)前哈希表table為null

哈希表未初始化或者正在初始化未完成,直接返回null;雖然line5和line18之間其它線程可能經(jīng)歷了千山萬水,至少在判斷tab==null的時(shí)間點(diǎn)key肯定是不存在的,返回null符合某一時(shí)刻的客觀事實(shí)。

2、如果讀取的bin頭節(jié)點(diǎn)為null

說明該槽位尚未有節(jié)點(diǎn),直接返回null。

3、如果讀取的bin是一個(gè)鏈表

說明頭節(jié)點(diǎn)是個(gè)普通Node。

(1)如果正在發(fā)生鏈表向紅黑樹的treeify工作,因?yàn)閠reeify本身并不破壞舊的鏈表bin的結(jié)構(gòu),只是在全部treeify完成后將頭節(jié)點(diǎn)一次性替換為新創(chuàng)建的TreeBin,可以放心讀取。

(2)如果正在發(fā)生resize且當(dāng)前bin正在被transfer,因?yàn)閠ransfer本身并不破壞舊的鏈表bin的結(jié)構(gòu),只是在全部transfer完成后將頭節(jié)點(diǎn)一次性替換為ForwardingNode,可以放心讀取。

(3)如果其它線程正在操作鏈表,在當(dāng)前線程遍歷鏈表的任意一個(gè)時(shí)間點(diǎn),都有可能同時(shí)在發(fā)生add/replace/remove操作。

  •  如果是add操作,因?yàn)殒湵淼墓?jié)點(diǎn)新增從JDK8以后都采用了后入式,無非是多遍歷或者少遍歷一個(gè)tailNode。
  •  如果是remove操作,存在遍歷到某個(gè)Node時(shí),正好有其它線程將其remove,導(dǎo)致其孤立于整個(gè)鏈表之外;但因?yàn)槠鋘ext引用未發(fā)生變更,整個(gè)鏈表并沒有斷開,還是可以照常遍歷鏈表直到tailNode。
  •  如果是replace操作,鏈表的結(jié)構(gòu)未變,只是某個(gè)Node的value發(fā)生了變化,沒有安全問題。

結(jié)論:對(duì)于鏈表這種線性數(shù)據(jù)結(jié)構(gòu),單線程寫且插入操作保證是后入式的前提下,并發(fā)讀取是安全的;不會(huì)存在誤讀、鏈表斷開導(dǎo)致的漏讀、讀到環(huán)狀鏈表等問題。

4、如果讀取的bin是一個(gè)紅黑樹

說明頭節(jié)點(diǎn)是個(gè)TreeBin節(jié)點(diǎn)。

(1)如果正在發(fā)生紅黑樹向鏈表的untreeify操作,因?yàn)閡ntreeify本身并不破壞舊的紅黑樹結(jié)構(gòu),只是在全部untreeify完成后將頭節(jié)點(diǎn)一次性替換為新創(chuàng)建的普通Node,可以放心讀取。

(2)如果正在發(fā)生resize且當(dāng)前bin正在被transfer,因?yàn)閠ransfer本身并不破壞舊的紅黑樹結(jié)構(gòu),只是在全部transfer完成后將頭節(jié)點(diǎn)一次性替換為ForwardingNode,可以放心讀取。

(3)如果其他線程在操作紅黑樹,在當(dāng)前線程遍歷紅黑樹的任意一個(gè)時(shí)間點(diǎn),都可能有單個(gè)的其它線程發(fā)生add/replace/remove/紅黑樹的翻轉(zhuǎn)等操作,參考下面的紅黑樹的讀寫鎖實(shí)現(xiàn)。

TreeBin中的讀寫鎖實(shí)現(xiàn) 

 
 
 
 
  1.  TreeNode root;  
  2.     volatile TreeNode first;  
  3.     volatile Thread waiter;  
  4.     volatile int lockState;  
  5.     // values for lockState  
  6.     static final int WRITER = 1; // set while holding write lock  
  7.     static final int WAITER = 2; // set when waiting for write lock  
  8.     static final int READER = 4; // increment value for setting read lock  
  9.     private final void lockRoot() {  
  10.         //如果一次性獲取寫鎖失敗,進(jìn)入contendedLock循環(huán)體,循環(huán)獲取寫鎖或者休眠等待  
  11.         if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))  
  12.             contendedLock(); // offload to separate method  
  13.     }  
  14.     private final void unlockRoot() {  
  15.         lockState = 0; 
  16.     }  
  17.     //對(duì)紅黑樹加互斥鎖,也就是寫鎖  
  18.     private final void contendedLock() {  
  19.         boolean waiting = false;  
  20.         for (int s;;) {  
  21.             //如果lockState除了第二位外其它位上都為0,表示紅黑樹當(dāng)前既沒有上讀鎖,又沒有上寫鎖,僅有可能存在waiter,可以嘗試直接獲取寫鎖  
  22.             if (((s = lockState) & ~WAITER) == 0) {  
  23.                 if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) { 
  24.                      if (waiting)  
  25.                         waiter = null;  
  26.                     return; 
  27.                 }  
  28.             }  
  29.             //如果lockState第二位是0,表示當(dāng)前沒有線程在等待寫鎖  
  30.             else if ((s & WAITER) == 0) {  
  31.                 //將lockState的第二位設(shè)置為1,相當(dāng)于打上了waiter的標(biāo)記,表示有線程在等待寫鎖  
  32.                 if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) {  
  33.                     waiting = true;  
  34.                     waiter = Thread.currentThread();  
  35.                 }  
  36.             }  
  37.             //休眠當(dāng)前線程  
  38.             else if (waiting)  
  39.                 LockSupport.park(this);  
  40.         }  
  41.     } 
  42.      //查找紅黑樹中的某個(gè)節(jié)點(diǎn)  
  43.     final Node find(int h, Object k) {  
  44.         if (k != null) {  
  45.             for (Node e = first; e != null; ) {  
  46.                 int s; K ek;  
  47.                 //如果當(dāng)前有waiter或者有寫鎖,走線性檢索,因?yàn)榧t黑樹雖然替代了鏈表,但其內(nèi)部依然保留了鏈表的結(jié)構(gòu),雖然鏈表的查詢性能一般,但根據(jù)先前的分析其讀取的安全性有保證。  
  48.                 //發(fā)現(xiàn)有寫鎖改走線性檢索,是為了避免等待寫鎖釋放花去太久時(shí)間; 而發(fā)現(xiàn)有waiter改走線性檢索,是為了避免讀鎖疊加的太多,導(dǎo)致寫鎖線程需要等待太長(zhǎng)的時(shí)間; 本質(zhì)上都是為了減少讀寫碰撞 
  49.                  //線性遍歷的過程中,每遍歷到下一個(gè)節(jié)點(diǎn)都做一次判斷,一旦發(fā)現(xiàn)鎖競(jìng)爭(zhēng)的可能性減少就改走tree檢索以提高性能 
  50.                  if (((s = lockState) & (WAITER|WRITER)) != 0) {  
  51.                     if (e.hash == h &&  
  52.                         ((eek = e.key) == k || (ek != null && k.equals(ek))))  
  53.                         return e;  
  54.                     ee = e.next;  
  55.                 }  
  56.                 //對(duì)紅黑樹加共享鎖,也就是讀鎖,CAS一次性增加4,也就是增加的只是3~32位  
  57.                 else if (U.compareAndSetInt(this, LOCKSTATE, s, 
  58.                                               s + READER)) {  
  59.                     TreeNode r, p;  
  60.                     try {  
  61.                         p = ((r = root) == null ? null :  
  62.                              r.findTreeNode(h, k, null));  
  63.                     } finally {  
  64.                         Thread w;  
  65.                         //釋放讀鎖,如果釋放完畢且有waiter,則將其喚醒  
  66.                         if (U.getAndAddInt(this, LOCKSTATE, -READER) ==  
  67.                             (READER|WAITER) && (w = waiter) != null)  
  68.                             LockSupport.unpark(w);  
  69.                     }  
  70.                     return p;  
  71.                 }  
  72.             }  
  73.         }  
  74.         return null;  
  75.     }  
  76.     //更新紅黑樹中的某個(gè)節(jié)點(diǎn)  
  77.     final TreeNode putTreeVal(int h, K k, V v) {  
  78.         Class kc = null;  
  79.         boolean searched = false;  
  80.         for (TreeNode p = root;;) {  
  81.             int dir, ph; K pk;  
  82.             //...省略處理紅黑樹數(shù)據(jù)結(jié)構(gòu)的代碼若干        
  83.                  else {  
  84.                     //寫操作前加互斥鎖  
  85.                     lockRoot();  
  86.                     try {  
  87.                         root = balanceInsertion(root, x);  
  88.                     } finally {  
  89.                         //釋放互斥鎖 
  90.                          unlockRoot();  
  91.                     }  
  92.                 }  
  93.                 break;  
  94.             }  
  95.         }  
  96.         assert checkInvariants(root);  
  97.         return null;  
  98.     }  

紅黑樹內(nèi)置了一套讀寫鎖的邏輯,其內(nèi)部定義了32位的int型變量lockState,第1位是寫鎖標(biāo)志位,第2位是寫鎖等待標(biāo)志位,從3~32位則是共享鎖標(biāo)志位。

讀寫操作是互斥的,允許多個(gè)線程同時(shí)讀取,但不允許讀寫操作并行,同一時(shí)刻只允許一個(gè)線程進(jìn)行寫操作;這樣任意時(shí)間點(diǎn)讀取的都是一個(gè)合法的紅黑樹,整體上是安全的。

有的同學(xué)會(huì)產(chǎn)生疑惑,寫鎖釋放時(shí)為何沒有將waiter喚醒的操作呢?是否有可能A線程進(jìn)入了等待區(qū),B線程獲取了寫鎖,釋放寫鎖時(shí)僅做了lockState=0的操作。

那么A線程是否就沒有機(jī)會(huì)被喚醒了,只有等待下一個(gè)讀鎖釋放時(shí)的喚醒了呢 ?

顯然這種情況違背常理,C13Map不會(huì)出現(xiàn)這樣的疏漏,再進(jìn)一步觀察,紅黑樹的變更操作的外圍,也就是在putValue/replaceNode那一層,都是對(duì)BIN的頭節(jié)點(diǎn)加了synchornized互斥鎖的,同一時(shí)刻只能有一個(gè)寫線程進(jìn)入TreeBin的方法范圍內(nèi),當(dāng)寫線程發(fā)現(xiàn)當(dāng)前waiter不為空,其實(shí)此waiter只能是當(dāng)前線程自己,可以放心的獲取寫鎖,不用擔(dān)心無法被喚醒的問題。

TreeBin在find讀操作檢索時(shí),在linearSearch(線性檢索)和treeSearch(樹檢索)間做了折衷,前者性能差但并發(fā)安全,后者性能佳但要做并發(fā)控制,可能導(dǎo)致鎖競(jìng)爭(zhēng);設(shè)計(jì)者使用線性檢索來盡量避免讀寫碰撞導(dǎo)致的鎖競(jìng)爭(zhēng),但評(píng)估到race condition已消失時(shí),又立即趨向于改用樹檢索來提高性能,在安全和性能之間做到了極佳的平衡。具體的折衷策略請(qǐng)參考find方法及注釋。

由于有線性檢索這樣一個(gè)抄底方案,以及入口處bin頭節(jié)點(diǎn)的synchornized機(jī)制,保證了進(jìn)入到TreeBin整體代碼塊的寫線程只有一個(gè);TreeBin中讀寫鎖的整體設(shè)計(jì)與ReentrantReadWriteLock相比還是簡(jiǎn)單了不少,比如并未定義用于存放待喚醒線程的threadQueue,以及讀線程僅會(huì)自旋而不會(huì)阻塞等等, 可以看做是特定條件下ReadWriteLock的簡(jiǎn)化版本。

5、如果讀取的bin是一個(gè)ForwardingNode

說明當(dāng)前bin已遷移,調(diào)用其find方法到nextTable讀取數(shù)據(jù)。

forwardingNode的find方法 

 
 
 
 
  1. static final class ForwardingNode extends Node {  
  2.     final Node[] nextTable;  
  3.     ForwardingNode(Node[] tab) {  
  4.         super(MOVED, null, null);  
  5.         this.nextTable = tab;  
  6.     }  
  7.      //遞歸檢索哈希表鏈  
  8.     Node find(int h, Object k) {  
  9.         // loop to avoid arbitrarily deep recursion on forwarding nodes  
  10.         outer: for (Node[] tab = nextTable;;) {  
  11.             Node e; int n;  
  12.             if (k == null || tab == null || (n = tab.length) == 0 ||  
  13.                 (e = tabAt(tab, (n - 1) & h)) == null)  
  14.                 return null;  
  15.             for (;;) {  
  16.                 int eh; K ek; 
  17.                  if ((eeh = e.hash) == h &&  
  18.                     ((eek = e.key) == k || (ek != null && k.equals(ek))))  
  19.                     return e;  
  20.                 if (eh < 0) {  
  21.                     if (e instanceof ForwardingNode) {  
  22.                         tab = ((ForwardingNode)e).nextTable;  
  23.                         continue outer;  
  24.                     }  
  25.                     else  
  26.                         return e.find(h, k);  
  27.                 }  
  28.                 if ((ee = e.next) == null)  
  29.                     return null;  
  30.             }  
  31.         }  
  32.     }  

ForwardingNode中保存了nextTable的引用,會(huì)轉(zhuǎn)向下一個(gè)哈希表進(jìn)行檢索,但并不能保證nextTable就一定是currentTable,因?yàn)樵诟卟l(fā)插入的情況下,極短時(shí)間內(nèi)就可以導(dǎo)致哈希表的多次擴(kuò)容,內(nèi)存中極有可能駐留一條哈希表鏈,彼此以bin的頭節(jié)點(diǎn)上的ForwardingNode相連,線程剛讀取時(shí)拿到的是table1,遍歷時(shí)卻有可能經(jīng)歷了哈希表的鏈條。

eh<0有三種情況:

  •  如果是ForwardingNode繼續(xù)遍歷下一個(gè)哈希表。
  •  如果是TreeBin,調(diào)用其find方法進(jìn)入TreeBin讀寫鎖的保護(hù)區(qū)讀取數(shù)據(jù)。
  •  如果是ReserveNode,說明當(dāng)前有compute計(jì)算中,整條bin還是一個(gè)空結(jié)構(gòu),直接返回null。

6、如果讀取的bin是一個(gè)ReserveNode

ReserveNode用于compute/computeIfAbsent原子計(jì)算的方法,在BIN的頭節(jié)點(diǎn)為null且計(jì)算尚未完成時(shí),先在bin的頭節(jié)點(diǎn)打上一個(gè)ReserveNode的占位標(biāo)記。

讀操作發(fā)現(xiàn)ReserveNode直接返回null,寫操作會(huì)因?yàn)闋?zhēng)奪ReserveNode的互斥鎖而進(jìn)入阻塞態(tài),在compute完成后被喚醒后循環(huán)重試。

六、寫操作putValue/replaceNode為什么是線程安全的

典型的編程范式如下:

C13Map的putValue方法 

 
 
 
 
  1. Node[] tab = table;  //將堆中的table變量賦給線程堆棧中的局部變量  
  2. Node f = tabAt(tab, i );  
  3. if(f==null){  
  4.  //當(dāng)前槽位沒有頭節(jié)點(diǎn),直接CAS寫入  
  5.  if (casTabAt(tab, i, null, new Node(hash, key, value)))  
  6.     break;  
  7. }else if(f.hash == MOVED){  
  8.  //加入?yún)f(xié)助搬運(yùn)行列  
  9.  helpTransfer(tab,f);  
  10. }  
  11. //不是forwardingNode  
  12. else if(f.hash != MOVED){  
  13.     //先鎖住I槽位上的頭節(jié)點(diǎn)  
  14.     synchronized (f) {  
  15.     //再doubleCheck看此槽位上的頭節(jié)點(diǎn)是否還是f  
  16.     if (tabAt(tab, i) == f) {  
  17.        ...各種寫操作  
  18.     }  
  19.   }  

1、當(dāng)前槽位如果頭節(jié)點(diǎn)為null時(shí),直接CAS寫入

有人也許會(huì)質(zhì)疑,如果寫入時(shí)resize操作已完成,發(fā)生了table向nextTable的轉(zhuǎn)變,是否會(huì)存在寫入的是舊表的bin導(dǎo)致數(shù)據(jù)丟失的可能 ? 

這種可能性是不存在的,因?yàn)橐粋€(gè)table在resize完成后所有的BIN都會(huì)被打上ForwardingNode的標(biāo)記,可以形象的理解為所有槽位上都插滿了紅旗,而此處在CAS時(shí)的compare的變量null,能夠保證至少在CAS原子操作發(fā)生的時(shí)間點(diǎn)table并未發(fā)生變更。

2、當(dāng)前槽位如果頭節(jié)點(diǎn)不為null

這里采用了一個(gè)小技巧:先鎖住I槽位上的頭節(jié)點(diǎn),進(jìn)入同步代碼塊后,再doubleCheck看此槽位上的頭節(jié)點(diǎn)是否有變化。

進(jìn)入同步塊后還需要doubleCheck的原因:雖然一開始獲取到的頭節(jié)點(diǎn)f并非ForwardingNode,但在獲取到f的同步鎖之前,可能有其它線程提前獲取了f的同步鎖并完成了transfer工作,并將I槽位上的頭節(jié)點(diǎn)標(biāo)記為ForwardingNode,此時(shí)的f就成了一個(gè)過時(shí)的bin的頭節(jié)點(diǎn)。

然而因?yàn)闃?biāo)記操作與transfer作為一個(gè)整體在同步的代碼塊中執(zhí)行,如果doubleCheck的結(jié)果是此槽位上的頭節(jié)點(diǎn)還是f,則表明至少在當(dāng)前時(shí)間點(diǎn)該槽位還沒有被transfer到新表(假如當(dāng)前有transfer in progress的話),可以放心的對(duì)該bin進(jìn)行put/remove/replace等寫操作。

只要未發(fā)生transfer或者treeify操作,鏈表的新增操作都是采取后入式,頭節(jié)點(diǎn)一旦確定不會(huì)輕易改變,這種后入式的更新方式保證了鎖定頭節(jié)點(diǎn)就等于鎖住了整個(gè)bin。

如果不作doubleCheck判斷,則有可能當(dāng)前槽位已被transfer,寫入的還是舊表的BIN,從而導(dǎo)致寫入數(shù)據(jù)的丟失;也有可能在獲取到f的同步鎖之前,其它線程對(duì)該BIN做了treeify操作,并將頭節(jié)點(diǎn)替換成了TreeBin, 導(dǎo)致寫入的是舊的鏈表,而非新的紅黑樹;

3、doubleCheck是否有ABA問題

也許有人會(huì)質(zhì)疑,如果有其它線程提前對(duì)當(dāng)前bin進(jìn)行了的remove/put的操作,引入了新的頭節(jié)點(diǎn),并且恰好發(fā)生了JVM的內(nèi)存釋放和重新分配,導(dǎo)致新的Node的引用地址恰好跟舊的相同,也就是存在所謂的ABA問題。

這個(gè)可以通過反證法來推翻,在帶有GC機(jī)制的語言環(huán)境下通常不會(huì)發(fā)生ABA問題,因?yàn)楫?dāng)前線程包含了對(duì)頭節(jié)點(diǎn)f的引用,當(dāng)前線程并未消亡,不可能存在f節(jié)點(diǎn)的內(nèi)存被GC回收的可能性。

還有人會(huì)質(zhì)疑,如果在寫入過程中主哈希表發(fā)生了變化,是否可能寫入的是舊表的bin導(dǎo)致數(shù)據(jù)丟失,這個(gè)也可以通過反證法來推翻,因?yàn)閠able向nextTable的轉(zhuǎn)化(也就是將resize后的新哈希表正式commit)只有在所有的槽位都已經(jīng)transfer成功后才會(huì)進(jìn)行,只要有一個(gè)bin未transfer成功,則說明當(dāng)前的table未發(fā)生變化,在當(dāng)前的時(shí)間點(diǎn)可以放心的向table的bin內(nèi)寫入數(shù)據(jù)。

4、如何操作才安全

可以總結(jié)出規(guī)律,在對(duì)table的槽位成功進(jìn)行了CAS操作且compare值為null,或者對(duì)槽位的非forwardingNode的頭節(jié)點(diǎn)加鎖后,doubleCheck頭節(jié)點(diǎn)未發(fā)生變化,對(duì)bin的寫操作都是安全的。

七、原子計(jì)算相關(guān)方法

原子計(jì)算主要包括:computeIfAbsent、computeIfPresent、compute、merge四個(gè)方法。 

1、幾個(gè)方法的比較 

主要區(qū)別如下:

(1)computeIfAbsent只會(huì)在判斷到key不存在時(shí)才會(huì)插入,判空與插入是一個(gè)原子操作,提供的FunctionalInterface是一個(gè)二元的Function, 接受key參數(shù),返回value結(jié)果;如果計(jì)算結(jié)果為null則不做插入。

(2)computeIfPresent只會(huì)在判讀單到Key非空時(shí)才會(huì)做更新,判斷非空與插入是一個(gè)原子操作,提供的FunctionalInterface是一個(gè)三元的BiFunction,接受key,value兩個(gè)參數(shù),返回新的value結(jié)果;如果新的value為null則刪除key對(duì)應(yīng)節(jié)點(diǎn)。

(3)compute則不加key是否存在的限制,提供的FunctionalInterface是一個(gè)三元的BiFunction,接受key,value兩個(gè)參數(shù),返回新的value結(jié)果;如果舊的value不存在則以null替代進(jìn)行計(jì)算;如果新的value為null則保證key對(duì)應(yīng)節(jié)點(diǎn)不會(huì)存在。

(4)merge不加key是否存在的限制,提供的FunctionalInterface是一個(gè)三元的BiFunction,接受oldValue, newVALUE兩個(gè)參數(shù),返回merge后的value;如果舊的value不存在,直接以newVALUE作為最終結(jié)果,存在則返回merge后的結(jié)果;如果最終結(jié)果為null,則保證key對(duì)應(yīng)節(jié)點(diǎn)不會(huì)存在。

2、何時(shí)會(huì)使用ReserveNode占位

如果目標(biāo)bin的頭節(jié)點(diǎn)為null,需要寫入的話有兩種手段:一種是生成好新的節(jié)點(diǎn)r后使用casTabAt(tab, i, null, r)原子操作,因?yàn)閏ompare的值為null可以保證并發(fā)的安全;

另外一種方式是創(chuàng)建一個(gè)占位的ReserveNode,鎖住該節(jié)點(diǎn)并將其CAS設(shè)置到bin的頭節(jié)點(diǎn),再進(jìn)行進(jìn)一步的原子計(jì)算操作;這兩種辦法都有可能在CAS的時(shí)候失敗,需要自旋反復(fù)嘗試。

(1)為什么只有computeIfAbsent/compute方法使用占位符的方式

computeIfPresent只有在BIN結(jié)構(gòu)非空的情況下才會(huì)展開原子計(jì)算,自然不存在需要ReserveNode占位的情況;鎖住已有的頭節(jié)點(diǎn)即可。

computeIfAbsent/compute方法在BIN結(jié)構(gòu)為空時(shí),需要展開Function或者BiFunction的運(yùn)算,這個(gè)操作是外部引入的需要耗時(shí)多久無法準(zhǔn)確評(píng)估;這種情況下如果采用先計(jì)算,再casTabAt(tab, i, null, r)的方式,如果有其它線程提前更新了這個(gè)BIN,那么就需要重新鎖定新加入的頭節(jié)點(diǎn),并重復(fù)一次原子計(jì)算(C13Map無法幫你緩存上次計(jì)算的結(jié)果,因?yàn)橛?jì)算的入?yún)⒂锌赡軙?huì)變化),這個(gè)開銷是比較大的。

而使用ReserveNode占位的方式無需等到原子計(jì)算出結(jié)果,可以第一時(shí)間先搶占BIN的所有權(quán),使其他并發(fā)的寫線程阻塞。

(2)merge方法為何不需要占位

原因是如果BIN結(jié)構(gòu)為空時(shí),根據(jù)merge的處理策略,老的value為空則直接使用新的value替代,這樣就省去了BiFunction中新老value進(jìn)行merge的計(jì)算,這個(gè)消耗幾乎是沒有的;因此可以使用casTabAt(tab, i, null, r)的方式直接修改,避免了使用ReserveNode占位,鎖定該占位ReserveNode后再進(jìn)行CAS修改的兩次CAS無謂的開銷。

C13Map的compute方法 

 
 
 
 
  1. public V compute(K key,  
  2.                  BiFunction remappingFunction) {  
  3.     if (key == null || remappingFunction == null)  
  4.         throw new nullPointerException();  
  5.     int h = spread(key.hashCode());  
  6.     V val = null;  
  7.     int delta = 0;  
  8.     int binCount = 0;  
  9.     for (Node[] tab = table; ; ) {  
  10.         Node f;  
  11.         int n, i, fh;  
  12.         if (tab == null || (n = tab.length) == 0)  
  13.             tab = initTable();  
  14.         else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {  
  15.             //創(chuàng)建占位Node  
  16.             Node r = new ReservationNode();  
  17.            //先鎖定該占位Node  
  18.             synchronized (r) {  
  19.                 //將其設(shè)置到BIN的頭節(jié)點(diǎn)  
  20.                 if (casTabAt(tab, i, null, r)) {  
  21.                     binCount = 1; 
  22.                      Node node = null;  
  23.                     try {  
  24.                         //開始原子計(jì)算  
  25.                         if ((val = remappingFunction.apply(key, null)) != null) {  
  26.                             delta = 1; 
  27.                              node = new Node(h, key, val, null);  
  28.                         }  
  29.                     } finally {  
  30.                         //設(shè)置計(jì)算后的最終節(jié)點(diǎn)  
  31.                         setTabAt(tab, i, node);  
  32.                     }  
  33.                 }  
  34.             } 
  35.              if (binCount != 0)  
  36.                 break;  
  37.         } else if ((ffh = f.hash) == MOVED) 
    文章題目:JavaConcurrentHashMap高并發(fā)安全實(shí)現(xiàn)原理解析
    瀏覽路徑:http://www.5511xx.com/article/dhgdddj.html