日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
ParallelStream的坑,不踩不知道,一踩嚇一跳

本文轉(zhuǎn)載自微信公眾號「小姐姐味道」,作者小姐姐養(yǎng)的狗  。轉(zhuǎn)載本文請聯(lián)系小姐姐味道公眾號。 

很多同學(xué)喜歡使用lambda表達(dá)式,它允許你定義短小精悍的函數(shù),體現(xiàn)你高超的編碼水平。當(dāng)然,這個功能在某些以代碼行數(shù)來衡量工作量的公司來說,就比較吃虧一些。

比如下面的代碼片段,讓人閱讀的時候就像是讀詩一樣。但是一旦用不好,也是會要命的。

 
 
 
  1. List transactionsIds = 
  2. widgets.stream() 
  3.              .filter(b -> b.getColor() == RED) 
  4.              .sorted((x,y) -> x.getWeight() - y.getWeight()) 
  5.              .mapToInt(Widget::getWeight) 
  6.              .sum(); 

這段代碼有一個關(guān)鍵的函數(shù),那就是stream。通過它,可以將一個普通的list,轉(zhuǎn)化為流,然后就可以使用類似于管道的方式對list進(jìn)行操作??傊眠^的都說好。

對這些函數(shù)還不是太熟悉?可以參考:《到處是map、flatMap,啥意思?》

問題來了

假如我們把stream換成parallelStream,會發(fā)生什么情況?

根據(jù)字面上的意思,流會從串行 變成并行。

既然是并行,那用屁股想一想,就知道這里面肯定會有線程安全問題。不過我們這里討論的并不是要你使用線程安全的集合,這個話題太低級?,F(xiàn)階段,知道在線程不安全的環(huán)境中使用線程安全的集合,已經(jīng)是一個基本的技能。

這次踩坑的地方,是并行流的性能問題。

我們用代碼來說話。

下面的代碼,開啟了8個線程,這8個線程都在使用并行流進(jìn)行數(shù)據(jù)計算。在執(zhí)行的邏輯中,我們讓每個任務(wù)都sleep 1秒鐘,這樣就能夠模擬一些I/O請求的耗時等待。

使用stream,程序會在30秒后返回,但我們期望程序能夠在1秒多返回,因為它是并行流,得對得起這個稱號。

測試發(fā)現(xiàn),我們等了好久,任務(wù)才執(zhí)行完畢。

 
 
 
  1. static void paralleTest() { 
  2.     List numbers = Arrays.asList( 
  3.             0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 
  4.             10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
  5.             20, 21, 22, 23, 24, 25, 26, 27, 28, 29 
  6.     ); 
  7.     final long begin = System.currentTimeMillis(); 
  8.     numbers.parallelStream().map(k -> { 
  9.         try { 
  10.             Thread.sleep(1000); 
  11.             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); 
  12.         } catch (InterruptedException e) { 
  13.             e.printStackTrace(); 
  14.         } 
  15.         return k; 
  16.     }).collect(Collectors.toList()); 
  17.  
  18. public static void main(String[] args) { 
  19. //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); 
  20.     new Thread(() -> paralleTest()).start(); 
  21.     new Thread(() -> paralleTest()).start(); 
  22.     new Thread(() -> paralleTest()).start(); 
  23.     new Thread(() -> paralleTest()).start(); 
  24.     new Thread(() -> paralleTest()).start(); 
  25.     new Thread(() -> paralleTest()).start(); 
  26.     new Thread(() -> paralleTest()).start(); 
  27.     new Thread(() -> paralleTest()).start(); 

實際上,在不同的機(jī)器上執(zhí)行,這段代碼花費(fèi)的時間都不一樣。

既然是并行,那肯定得有個并行度。太低了,體現(xiàn)不到并行的能能力;太大了,又浪費(fèi)了上下文切換的時間。我是很沮喪的發(fā)現(xiàn),很多高級研發(fā),將線程池的各種參數(shù)背的滾瓜爛熟,各種調(diào)優(yōu),竟然敢睜一只眼閉一只眼的在I/O密集型業(yè)務(wù)中用上parallelStream。

要了解這個并行度,我們需要查看具體的構(gòu)造方法。在ForkJoinPool類中找到這樣的代碼。

 
 
 
  1. try {  // ignore exceptions in accessing/parsing properties 
  2.     String pp = System.getProperty 
  3.         ("java.util.concurrent.ForkJoinPool.common.parallelism"); 
  4.     if (pp != null) 
  5.         parallelism = Integer.parseInt(pp); 
  6.     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( 
  7.         "java.util.concurrent.ForkJoinPool.common.threadFactory"); 
  8.     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( 
  9.         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 
  10. } catch (Exception ignore) { 
  11.  
  12. if (fac == null) { 
  13.     if (System.getSecurityManager() == null) 
  14.         fac = defaultForkJoinWorkerThreadFactory; 
  15.     else // use security-managed default 
  16.         fac = new InnocuousForkJoinWorkerThreadFactory(); 
  17. if (parallelism < 0 && // default 1 less than #cores 
  18.     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 
  19.     parallelism = 1; 
  20. if (parallelism > MAX_CAP) 
  21.     parallelism = MAX_CAP; 

可以看到,并行度到底是多少,是由下面的參數(shù)來控制的。如果無法獲取這個參數(shù),則默認(rèn)使用 CPU個數(shù)-1 的并行度。

可以看到,這個函數(shù)是為了計算密集型業(yè)務(wù)去設(shè)計的。如果你喂給它一大堆任務(wù),它就會由并行執(zhí)行退變成類似于串行的效果。

 
 
 
  1. -Djava.util.concurrent.ForkJoinPool.common.parallelism=N 

即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設(shè)置了一個初始值大小,它依然有問題。

因為,parallelism這個變量是final的,一旦設(shè)定,不允許修改。也就是說,上面的參數(shù)只會生效一次。

張三可能使用下面的代碼,設(shè)置了并行度大小為20。

 
 
 
  1. System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); 

李四可能用同樣的方式,設(shè)置了這個值為30。那實際在項目中用的是哪個值,那就得問JVM是怎么加載的類信息了。

這種方式并不太非常靠譜。

一種解決方式

我們可以通過提供外置的forkjoinpool,也就是改變提交方式,來實現(xiàn)不同類型的任務(wù)分離。

代碼如下所示,通過顯式的代碼提交,即可實現(xiàn)任務(wù)分離。

 
 
 
  1. ForkJoinPool pool = new ForkJoinPool(30); 
  2.  
  3. final long begin = System.currentTimeMillis(); 
  4. try { 
  5.     pool.submit(() -> 
  6.             numbers.parallelStream().map(k -> { 
  7.                 try { 
  8.                     Thread.sleep(1000); 
  9.                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); 
  10.                 } catch (InterruptedException e) { 
  11.                     e.printStackTrace(); 
  12.                 } 
  13.                 return k; 
  14.             }).collect(Collectors.toList())).get(); 
  15. } catch (InterruptedException e) { 
  16.     e.printStackTrace(); 
  17. } catch (ExecutionException e) { 
  18.     e.printStackTrace(); 

這樣,不同的場景,就可以擁有不同的并行度。這種方式和CountDownLatch有異曲同工之妙,我們需要手動管理資源。

使用了這種方式,代碼量增加,已經(jīng)和優(yōu)雅關(guān)系不大了,不僅不優(yōu)雅,而且丑的要命。白天鵝變成了丑小鴨,你還會愛它么?

作者簡介:小姐姐味道 (xjjdog),一個不允許程序員走彎路的公眾號。聚焦基礎(chǔ)架構(gòu)和Linux。十年架構(gòu),日百億流量,與你探討高并發(fā)世界,給你不一樣的味道。我的個人微信xjjdog0,歡迎添加好友,進(jìn)一步交流。


網(wǎng)站標(biāo)題:ParallelStream的坑,不踩不知道,一踩嚇一跳
文章地址:http://www.5511xx.com/article/codjhgj.html