日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
詳細的.Net并行編程高級教程--Parallel

一直覺得自己對并發(fā)了解不夠深入,特別是看了《代碼整潔之道》覺得自己有必要好好學學并發(fā)編程,因為性能也是衡量代碼整潔的一大標準。而且在《失控》這本書中也多次提到并發(fā),不管是計算機還是生物都并發(fā)處理著各種事物。人真是奇怪,當你關注一個事情的時候,你會發(fā)現(xiàn)周圍的事物中就常出現(xiàn)那個事情。所以好奇心驅使下學習并發(fā)。便有了此文。

一、理解硬件線程和軟件線程

     多核處理器帶有一個以上的物理內(nèi)核--物理內(nèi)核是真正的獨立處理單元,多個物理內(nèi)核使得多條指令能夠同時并行運行。硬件線程也稱為邏輯內(nèi)核,一個物理內(nèi) 核可以使用超線程技術提供多個硬件線程。所以一個硬件線程并不代表一個物理內(nèi)核;Windows中每個運行的程序都是一個進程,每一個進程都會創(chuàng)建并運行 一個或多個線程,這些線程稱為軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。

二、并行場合

    .Net Framework4 引入了新的Task Parallel Library(任務并行庫,TPL),它支持數(shù)據(jù)并行、任務并行和流水線。讓開發(fā)人員應付不同的并行場合。

  • 數(shù)據(jù)并行:有大量數(shù)據(jù)需要處理,并且必須對每一份數(shù)據(jù)執(zhí)行同樣的操作。比如通過256bit的密鑰對100個Unicode字符串進行AES算法加密。

  • 任務并行:通過任務并發(fā)運行不同的操作。例如生成文件散列碼,加密字符串,創(chuàng)建縮略圖。

  • 流水線:這是任務并行和數(shù)據(jù)并行的結合體。

    TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的并發(fā)的操作,然而我們不一定要使用Task類的實例,可以使用Parallel靜態(tài)類。它提供了 Parallel.Invoke, Parallel.For Parallel.Forecah 三個方法。

三、Parallel.Invoke

     試圖讓很多方法并行運行的最簡單的方法就是使用Parallel類的Invoke方法。例如有四個方法:

  • WatchMovie

  • HaveDinner

  • ReadBook

  • WriteBlog

    通過下面的代碼就可以使用并行。

 System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);

  這段代碼會創(chuàng)建指向每一個方法的委托。Invoke方法接受一個Action的參數(shù)組。

1

public static void Invoke(params Action[] actions);

  用lambda表達式或匿名委托可以達到同樣的效果。

System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });

 1.沒有特定的執(zhí)行順序。

   Parallel.Invoke方法只有在4個方法全部完成之后才會返回。它至少需要4個硬件線程才足以讓這4個方法并發(fā)運行。但并不保證這4個方法能夠同時啟動運行,如果一個或者多個內(nèi)核處于繁忙狀態(tài),那么底層的調(diào)度邏輯可能會延遲某些方法的初始化執(zhí)行。

給方法加上延時,就可以看到必須等待最長的方法執(zhí)行完成才回到主方法。

 
 
  1. static void Main(string[] args)
  2.         {
  3.             System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook,
  4.                 WriteBlog);
  5.             Console.WriteLine("執(zhí)行完成");
  6.             Console.ReadKey();
  7.         }
  8.         static void WatchMovie()
  9.         {
  10.             Thread.Sleep(5000);
  11.             Console.WriteLine("看電影");
  12.         }
  13.         static void HaveDinner()
  14.         {
  15.             Thread.Sleep(1000);
  16.             Console.WriteLine("吃晚飯");
  17.         }
  18.         static void ReadBook()
  19.         {
  20.             Thread.Sleep(2000);
  21.             Console.WriteLine("讀書");
  22.         }
  23.         static void WriteBlog()
  24.         {
  25.             Thread.Sleep(3000);
  26.             Console.WriteLine("寫博客");
  27.         }

這樣會造成很多邏輯內(nèi)核處于長時間閑置狀態(tài)。

四、Parallel.For

Parallel.For為固定數(shù)目的獨立For循環(huán)迭代提供了負載均衡 (即將工作分發(fā)到不同的任務中執(zhí)行,這樣所有的任務在大部分時間都可以保持繁忙) 的并行執(zhí)行。從而能盡可能地充分利用所有的可用的內(nèi)核。

我們比較下下面兩個方法,一個使用For循環(huán),一個使用Parallel.For  都是生成密鑰在轉換為十六進制字符串。

  
 
  1. private static void GenerateAESKeys()
  2.         {
  3.             var sw = Stopwatch.StartNew();
  4.             for (int i = 0; i < NUM_AES_KEYS; i++)
  5.             {
  6.                 var aesM = new AesManaged();
  7.                 aesM.GenerateKey();
  8.                 byte[] result = aesM.Key;
  9.                 string hexStr = ConverToHexString(result);
  10.             }
  11.             Console.WriteLine("AES:"+sw.Elapsed.ToString());
  12.         }
  13.  private static void ParallelGenerateAESKeys()
  14.         {
  15.             var sw = Stopwatch.StartNew();
  16.             System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) =>
  17.             {
  18.                 var aesM = new AesManaged();
  19.                 aesM.GenerateKey();
  20.                 byte[] result = aesM.Key;
  21.                 string hexStr = ConverToHexString(result);
  22.             });
  23.             Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString());
  24.         }

  private static int NUM_AES_KEYS = 100000;
        static void Main(string[] args)
        {
            Console.WriteLine("執(zhí)行"+NUM_AES_KEYS+"次:"); GenerateAESKeys();
            ParallelGenerateAESKeys();
            Console.ReadKey();
        }

執(zhí)行1000000次

這里并行的時間是串行的一半。

五、Parallel.ForEach

在Parallel.For中,有時候對既有循環(huán)進行優(yōu)化可能會是一個非常復雜的任務。Parallel.ForEach為固定數(shù)目的獨立For Each循環(huán)迭代提供了負載均衡的并行執(zhí)行,且支持自定義分區(qū)器,讓使用者可以完全掌握數(shù)據(jù)分發(fā)。實質(zhì)就是將所有要處理的數(shù)據(jù)區(qū)分為多個部分,然后并行運 行這些串行循環(huán)。

修改上面的代碼:

 
 
  1. System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
  2.             {
  3.                 var aesM = new AesManaged();
  4.                 Console.WriteLine("AES Range({0},{1} 循環(huán)開始時間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);
  5.                 for (int i = range.Item1; i < range.Item2; i++)
  6.                 {
  7.                     aesM.GenerateKey();
  8.                     byte[] result = aesM.Key;
  9.                     string hexStr = ConverToHexString(result);
  10.                 }
  11.                 Console.WriteLine("AES:"+sw.Elapsed.ToString());
  12.             });

從執(zhí)行結果可以看出,分了13個段執(zhí)行的。

第二次執(zhí)行還是13個段。速度上稍微有差異。開始沒有指定分區(qū)數(shù),Partitioner.Create使用的是內(nèi)置默認值。

而且我們發(fā)現(xiàn)這些分區(qū)并不是同時執(zhí)行的,大致是分了三個時間段執(zhí)行。而且執(zhí)行順序是不同的??偟臅r間和Parallel.For的方法差不多。

 public static ParallelLoopResult ForEach(Partitioner source, Action body)

Parallel.ForEach方法定義了source和Body兩個參數(shù)。source是指分區(qū)器。提供了分解為多個分區(qū)的數(shù)據(jù)源。body是 要調(diào)用的委托。它接受每一個已定義的分區(qū)作為參數(shù)。一共有20多個重載,在上面的例子中,分區(qū)的類型為Tuple,是一個 二元組類型。此外,返回一個ParallelLoopResult的值。

Partitioner.Create 創(chuàng)建分區(qū)是根據(jù)邏輯內(nèi)核數(shù)及其他因素決定。

 
 
  1. public static OrderablePartitioner> Create(int fromInclusive, int toExclusive)
  2.     {
  3.       int num = 3;
  4.       if (toExclusive <= fromInclusive)
  5.         throw new ArgumentOutOfRangeException("toExclusive");
  6.       int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num);
  7.       if (rangeSize == 0)
  8.         rangeSize = 1;
  9.       return Partitioner.Create>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);
  10.     }

因此我們可以修改分區(qū)數(shù)目,rangesize大致為250000左右。也就是說我的邏輯內(nèi)核是4.

   var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
   System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>

再次執(zhí)行:

分區(qū)變成了四個,時間上沒有多大差別(***個時間是串行時間)。我們看見這四個分區(qū)幾乎是同時執(zhí)行的。大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,然而對分區(qū)的控制便于使用者對自己的工作負載進行分析,來改進整體的性能。

Parallel.ForEach也能對IEnumerable集合進行重構。Enumerable.Range生產(chǎn)了序列化的數(shù)目。但這樣就沒有上面的分區(qū)效果。

 
 
  1. private static void ParallelForEachGenerateMD5HasHes()
  2.         {
  3.             var sw = Stopwatch.StartNew();
  4.             System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number =>
  5.             {
  6.                 var md5M = MD5.Create();
  7.                 byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
  8.                 byte[] result = md5M.ComputeHash(data);
  9.                 string hexString = ConverToHexString(result);
  10.             });
  11.             Console.WriteLine("MD5:"+sw.Elapsed.ToString());
  12.         }

#p#

六、從循環(huán)中退出

和串行運行中的break不同,ParallelLoopState 提供了兩個方法用于停止Parallel.For 和 Parallel.ForEach的執(zhí)行。

  • Break:讓循環(huán)在執(zhí)行了當前迭代后盡快停止執(zhí)行。比如執(zhí)行到100了,那么循環(huán)會處理掉所有小于100的迭代。

  • Stop:讓循環(huán)盡快停止執(zhí)行。如果執(zhí)行到了100的迭代,那不能保證處理完所有小于100的迭代。

修改上面的方法:執(zhí)行3秒后退出。

 
 
  1. private static void ParallelLoopResult(ParallelLoopResult loopResult)
  2.         {
  3.             string text;
  4.             if (loopResult.IsCompleted)
  5.             {
  6.                 text = "循環(huán)完成";
  7.             }
  8.             else
  9.             {
  10.                 if (loopResult.LowestBreakIteration.HasValue)
  11.                 {
  12.                     text = "Break終止";
  13.                 }
  14.                 else
  15.                 {
  16.                     text = "Stop 終止";
  17.                 }
  18.             }
  19.             Console.WriteLine(text);
  20.         }
  21.         private static void ParallelForEachGenerateMD5HasHesBreak()
  22.         {
  23.             var sw = Stopwatch.StartNew();
  24.             var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) =>
  25.             {
  26.                 var md5M = MD5.Create();
  27.                 byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
  28.                 byte[] result = md5M.ComputeHash(data);
  29.                 string hexString = ConverToHexString(result);
  30.                 if (sw.Elapsed.Seconds > 3)
  31.                 {
  32.                     loopState.Stop();
  33.                 }
  34.             });
  35.             ParallelLoopResult(loopresult);
  36.             Console.WriteLine("MD5:" + sw.Elapsed);
  37.         }

七、捕捉并行循環(huán)中發(fā)生的異常。

  當并行迭代中調(diào)用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。

 
 
  1. private static void ParallelForEachGenerateMD5HasHesException()
  2.         {
  3.             var sw = Stopwatch.StartNew();
  4.             var loopresult = new ParallelLoopResult();
  5.             try
  6.             {
  7.                 loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) =>
  8.                 {
  9.                     var md5M = MD5.Create();
  10.                     byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
  11.                     byte[] result = md5M.ComputeHash(data);
  12.                     string hexString = ConverToHexString(result);
  13.                     if (sw.Elapsed.Seconds > 3)
  14.                     {
  15.                         throw new TimeoutException("執(zhí)行超過三秒");
  16.                     }
  17.                 });
  18.             }
  19.             catch (AggregateException ex)
  20.             {
  21.                 foreach (var innerEx in  ex.InnerExceptions)
  22.                 {
  23.                     Console.WriteLine(innerEx.ToString());
  24.                 }
  25.             }
  26.            
  27.             ParallelLoopResult(loopresult);
  28.             Console.WriteLine("MD5:" + sw.Elapsed);
  29.         }

結果:

 異常出現(xiàn)了好幾次。

#p#

 八、指定并行度。

TPL的方法總會試圖利用所有可用的邏輯內(nèi)核來實現(xiàn)***的結果,但有時候你并不希望在并行循環(huán)中使用所有的內(nèi)核。比如你需要留出一個不參與并行計算 的內(nèi)核,來創(chuàng)建能夠響應用戶的應用程序,而且這個內(nèi)核需要幫助你運行代碼中的其他部分。這個時候一種好的解決方法就是指定***并行度。

這需要創(chuàng)建一個ParallelOptions的實例,設置MaxDegreeOfParallelism的值。

 
 
  1. private static void ParallelMaxDegree(int maxDegree)
  2.         {
  3.             var parallelOptions = new ParallelOptions();
  4.             parallelOptions.MaxDegreeOfParallelism = maxDegree;
  5.             var sw = Stopwatch.StartNew();
  6.             System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) =>
  7.             {
  8.                 var aesM = new AesManaged();
  9.                 aesM.GenerateKey();
  10.                 byte[] result = aesM.Key;
  11.                 string hexStr = ConverToHexString(result);
  12.             });
  13.             Console.WriteLine("AES:" + sw.Elapsed.ToString());
  14.         }

調(diào)用:如果在四核微處理器上運行,那么將使用3個內(nèi)核。

 ParallelMaxDegree(Environment.ProcessorCount - 1);

時間上大致慢了點(***次Parallel.For 3.18s),但可以騰出一個內(nèi)核來處理其他的事情。

小結:這次學習了Parallel相關方法以及如何退出并行循環(huán)和捕獲異常、設置并行度,還有并行相關的知識。園子里也有類似的博客。但作為自己知識的管理,在這里梳理一遍。


網(wǎng)站欄目:詳細的.Net并行編程高級教程--Parallel
文章分享:http://www.5511xx.com/article/ccshdhp.html