日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
一個(gè)進(jìn)程間同步和通訊的C#框架

threadmsg_demo.zip ~ 41KB    下載

站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到貴港網(wǎng)站設(shè)計(jì)與貴港網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類(lèi)型包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、國(guó)際域名空間、雅安服務(wù)器托管、企業(yè)郵箱。業(yè)務(wù)覆蓋貴港地區(qū)。

threadmsg_src.zip ~ 65KB    下載

0.背景簡(jiǎn)介

微軟在 .NET 框架中提供了多種實(shí)用的線程同步手段,其中包括 monitor 類(lèi)及 reader-writer鎖。但跨進(jìn)程的同步方法還是非常欠缺。另外,目前也沒(méi)有方便的線程間及進(jìn)程間傳遞消息的方法。例如C/S和SOA,又或者生產(chǎn)者/消費(fèi)者模式中就常常需要傳遞消息。為此我編寫(xiě)了一個(gè)獨(dú)立完整的框架,實(shí)現(xiàn)了跨線程和跨進(jìn)程的同步和通訊。這框架內(nèi)包含了信號(hào)量,信箱,內(nèi)存映射文件,阻塞通道,及簡(jiǎn)單消息流控制器等組件。這篇文章里提到的類(lèi)同屬于一個(gè)開(kāi)源的庫(kù)項(xiàng)目(BSD許可),你可以從這里下載到 www.cdrnet.net/projects/threadmsg/.

這個(gè)框架的目的是:

1.封裝性:通過(guò)MSMQ消息隊(duì)列發(fā)送消息的線程無(wú)需關(guān)心消息是發(fā)送到另一個(gè)線程還是另一臺(tái)機(jī)器。

2.簡(jiǎn)單性:向其他進(jìn)程發(fā)送消息只需調(diào)用一個(gè)方法。

注意:我刪除了本文中全部代碼的XML注釋以節(jié)省空間。如果你想知道這些方法和參數(shù)的詳細(xì)信息,請(qǐng)參考附件中的代碼。

1.先看一個(gè)簡(jiǎn)單例子

使用了這個(gè)庫(kù)后,跨進(jìn)程的消息傳遞將變得非常簡(jiǎn)單。我將用一個(gè)小例子來(lái)作示范:一個(gè)控制臺(tái)程序,根據(jù)參數(shù)可以作為發(fā)送方也可以作為接收方運(yùn)行。在發(fā)送程序里,你可以輸入一定的文本并發(fā)送到信箱內(nèi)(返回key),接收程序?qū)@示所有從信箱內(nèi)收到的消息。你可以運(yùn)行無(wú)數(shù)個(gè)發(fā)送程序和接收程序,但是每個(gè)消息只會(huì)被具體的某一個(gè)接收程序所收到。

 
 
 
 
  1. [Serializable] 
  2. struct Message 
  3.   public string Text; 
  4.  
  5. class Test 
  6.   IMailBox mail; 
  7.  
  8.   public Test() 
  9.   { 
  10.     mail = new ProcessMailBox("TMProcessTest",1024); 
  11.   } 
  12.  
  13.   public void RunWriter() 
  14.   { 
  15.     Console.WriteLine("Writer started"); 
  16.     Message msg; 
  17.     while(true) 
  18.     { 
  19.       msg.Text = Console.ReadLine(); 
  20.       if(msg.Text.Equals("exit")) 
  21.         break; 
  22.       mail.Content = msg; 
  23.     } 
  24.   } 
  25.  
  26.   public void RunReader() 
  27.   { 
  28.     Console.WriteLine("Reader started"); 
  29.     while(true) 
  30.     { 
  31.       Message msg = (Message)mail.Content; 
  32.       Console.WriteLine(msg.Text); 
  33.     } 
  34.   } 
  35.  
  36.   [STAThread] 
  37.   static void Main(string[] args) 
  38.   { 
  39.     Test test = new Test(); 
  40.     if(args.Length > 0) 
  41.       test.RunWriter(); 
  42.     else 
  43.       test.RunReader(); 
  44.   } 

信箱一旦創(chuàng)建之后(這上面代碼里是 ProcessMailBox ),接收消息只需要讀取 Content 屬性,發(fā)送消息只需要給這個(gè)屬性賦值。當(dāng)沒(méi)有數(shù)據(jù)時(shí),獲取消息將會(huì)阻塞當(dāng)前線程;發(fā)送消息時(shí)如果信箱里已經(jīng)有數(shù)據(jù),則會(huì)阻塞當(dāng)前線程。正是有了這個(gè)阻塞,整個(gè)程序是完全基于中斷的,并且不會(huì)過(guò)度占用CPU(不需要進(jìn)行輪詢)。發(fā)送和接收的消息可以是任意支持序列化(Serializable)的類(lèi)型。

然而,實(shí)際上暗地里發(fā)生的事情有點(diǎn)復(fù)雜:消息通過(guò)內(nèi)存映射文件來(lái)傳遞,這是目前唯一的跨進(jìn)程共享內(nèi)存的方法,這個(gè)例子里我們只會(huì)在 pagefile 里面產(chǎn)生虛擬文件。對(duì)這個(gè)虛擬文件的訪問(wèn)是通過(guò) win32 信號(hào)量來(lái)確保同步的。消息首先序列化成二進(jìn)制,然后再寫(xiě)進(jìn)該文件,這就是為什么需要聲明Serializable屬性。內(nèi)存映射文件和 win32 信號(hào)量都需要調(diào)用 NT內(nèi)核的方法。多得了 .NET 框架中的 Marshal 類(lèi),我們可以避免編寫(xiě)不安全的代碼。我們將在下面討論更多的細(xì)節(jié)。

#p#

2. .NET里面的跨線程/進(jìn)程同步

線程/進(jìn)程間的通訊需要共享內(nèi)存或者其他內(nèi)建機(jī)制來(lái)發(fā)送/接收數(shù)據(jù)。即使是采用共享內(nèi)存的方式,也還需要一組同步方法來(lái)允許并發(fā)訪問(wèn)。

同一個(gè)進(jìn)程內(nèi)的所有線程都共享公共的邏輯地址空間(堆)。對(duì)于不同進(jìn)程,從 win2000 開(kāi)始就已經(jīng)無(wú)法共享內(nèi)存。然而,不同的進(jìn)程可以讀寫(xiě)同一個(gè)文件。WinAPI提供了多種系統(tǒng)調(diào)用方法來(lái)映射文件到進(jìn)程的邏輯空間,及訪問(wèn)系統(tǒng)內(nèi)核對(duì)象(會(huì)話)指向的 pagefile 里面的虛擬文件。無(wú)論是共享堆,還是共享文件,并發(fā)訪問(wèn)都有可能導(dǎo)致數(shù)據(jù)不一致。我們就這個(gè)問(wèn)題簡(jiǎn)單討論一下,該怎樣確保線程/進(jìn)程調(diào)用的有序性及數(shù)據(jù)的一致性。

2.1 線程同步

.NET 框架和 C# 提供了方便直觀的線程同步方法,即 monitor 類(lèi)和 lock 語(yǔ)句(本文將不會(huì)討論 .NET 框架的互斥量)。對(duì)于線程同步,雖然本文提供了其他方法,我們還是推薦使用 lock 語(yǔ)句。

 
 
 
 
  1. void Work1() 
  2.   NonCriticalSection1(); 
  3.   Monitor.Enter(this); 
  4.   try 
  5.   { 
  6.     CriticalSection(); 
  7.   } 
  8.   finally 
  9.   { 
  10.     Monitor.Exit(this); 
  11.   } 
  12.   NonCriticalSection2(); 

Work1 和 Work2 是等價(jià)的。在C#里面,很多人喜歡第二個(gè)方法,因?yàn)樗蹋也蝗菀壮鲥e(cuò)。

2.2 跨線程信號(hào)量

信號(hào)量是經(jīng)典的同步基本概念之一(由 Edsger Dijkstra 引入)。信號(hào)量是指一個(gè)有計(jì)數(shù)器及兩個(gè)操作的對(duì)象。它的兩個(gè)操作是:獲取(也叫P或者等待),釋放(也叫V或者收到信號(hào))。信號(hào)量在獲取操作時(shí)如果計(jì)數(shù)器為0則阻塞,否則將計(jì)數(shù)器減一;在釋放時(shí)將計(jì)數(shù)器加一,且不會(huì)阻塞。雖然信號(hào)量的原理很簡(jiǎn)單,但是實(shí)現(xiàn)起來(lái)有點(diǎn)麻煩。好在,內(nèi)建的 monitor 類(lèi)有阻塞特性,可以用來(lái)實(shí)現(xiàn)信號(hào)量。

 
 
 
 
  1. public sealed class ThreadSemaphore : ISemaphore 
  2.   private int counter; 
  3.   private readonly int max; 
  4.  
  5.   public ThreadSemaphore() : this(0, int.Max) {} 
  6.   public ThreadSemaphore(int initial) : this(initial, int.Max) {} 
  7.   public ThreadSemaphore(int initial, int max) 
  8.   { 
  9.     this.counter = Math.Min(initial,max); 
  10.     this.max = max; 
  11.   } 
  12.  
  13.   public void Acquire() 
  14.   { 
  15.     lock(this) 
  16.     { 
  17.       counter--; 
  18.       if(counter < 0 && !Monitor.Wait(this)) 
  19.         throw new SemaphoreFailedException(); 
  20.     } 
  21.   } 
  22.  
  23.   public void Acquire(TimeSpan timeout) 
  24.   { 
  25.     lock(this) 
  26.     { 
  27.       counter--; 
  28.       if(counter < 0 && !Monitor.Wait(this,timeout)) 
  29.         throw new SemaphoreFailedException(); 
  30.     } 
  31.   } 
  32.  
  33.   public void Release() 
  34.   { 
  35.     lock(this) 
  36.     { 
  37.       if(counter >= max) 
  38.         throw new SemaphoreFailedException(); 
  39.       if(counter < 0) 
  40.         Monitor.Pulse(this); 
  41.       counter++; 
  42.     } 
  43.   } 

信號(hào)量在復(fù)雜的阻塞情景下更加有用,例如我們后面將要討論的通道(channel)。你也可以使用信號(hào)量來(lái)實(shí)現(xiàn)臨界區(qū)的排他性(如下面的 Work3),但是我還是推薦使用內(nèi)建的 lock 語(yǔ)句,像上面的 Work2 那樣。

請(qǐng)注意:如果使用不當(dāng),信號(hào)量也是有潛在危險(xiǎn)的。正確的做法是:當(dāng)獲取信號(hào)量失敗時(shí),千萬(wàn)不要再調(diào)用釋放操作;當(dāng)獲取成功時(shí),無(wú)論發(fā)生了什么錯(cuò)誤,都要記得釋放信號(hào)量。遵循這樣的原則,你的同步才是正確的。Work3 中的 finally 語(yǔ)句就是為了保證正確釋放信號(hào)量。注意:獲取信號(hào)量( s.Acquire() )的操作必須放到 try 語(yǔ)句的外面,只有這樣,當(dāng)獲取失敗時(shí)才不會(huì)調(diào)用釋放操作。

 
 
 
 
  1. ThreadSemaphore s = new ThreadSemaphore(1); 
  2. void Work3() 
  3.   NonCriticalSection1(); 
  4.   s.Acquire(); 
  5.   try 
  6.   { 
  7.     CriticalSection(); 
  8.   } 
  9.   finally 
  10.   { 
  11.     s.Release(); 
  12.   } 
  13.   NonCriticalSection2(); 

2.3 跨進(jìn)程信號(hào)量

為了協(xié)調(diào)不同進(jìn)程訪問(wèn)同一資源,我們需要用到上面討論過(guò)的概念。很不幸,.NET 中的 monitor 類(lèi)不可以跨進(jìn)程使用。但是,win32 API提供的內(nèi)核信號(hào)量對(duì)象可以用來(lái)實(shí)現(xiàn)跨進(jìn)程同步。 Robin Galloway-Lunn 介紹了怎樣將 win32 的信號(hào)量映射到 .NET 中(見(jiàn) Using Win32 Semaphores in C# )。我們的實(shí)現(xiàn)也類(lèi)似:

 
 
 
 
  1. [DllImport("kernel32",EntryPoint="CreateSemaphore", 
  2.      SetLastError=true,CharSet=CharSet.Unicode)] 
  3. internal static extern uint CreateSemaphore( 
  4.   SecurityAttributes auth, int initialCount, 
  5.     int maximumCount, string name); 
  6.  
  7. [DllImport("kernel32",EntryPoint="WaitForSingleObject", 
  8.  SetLastError=true,CharSet=CharSet.Unicode)] 
  9. internal static extern uint WaitForSingleObject( 
  10.  uint hHandle, uint dwMilliseconds); 
  11.  
  12. [DllImport("kernel32",EntryPoint="ReleaseSemaphore", 
  13.  SetLastError=true,CharSet=CharSet.Unicode)] 
  14. [return : MarshalAs( UnmanagedType.VariantBool )] 
  15. internal static extern bool ReleaseSemaphore( 
  16.   uint hHandle, int lReleaseCount, out int lpPreviousCount); 
  17.      
  18. [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true, 
  19.   CharSet=CharSet.Unicode)] 
  20. [return : MarshalAs( UnmanagedType.VariantBool )] 
  21. internal static extern bool CloseHandle(uint hHandle); 
 
 
 
 
  1. public class ProcessSemaphore : ISemaphore, IDisposable 
  2.   private uint handle; 
  3.   private readonly uint interruptReactionTime; 
  4.  
  5.   public ProcessSemaphore(string name) : this( 
  6.    name,0,int.MaxValue,500) {} 
  7.   public ProcessSemaphore(string name, int initial) : this( 
  8.    name,initial,int.MaxValue,500) {} 
  9.   public ProcessSemaphore(string name, int initial, 
  10.    int max, int interruptReactionTime) 
  11.   {        
  12.     this.interruptReactionTime = (uint)interruptReactionTime; 
  13.     this.handle = NTKernel.CreateSemaphore(null, initial, max, name); 
  14.     if(handle == 0) 
  15.       throw new SemaphoreFailedException(); 
  16.   } 
  17.  
  18.   public void Acquire() 
  19.   { 
  20.     while(true) 
  21.     { //looped 0.5s timeout to make NT-blocked threads interruptable. 
  22.       uint res = NTKernel.WaitForSingleObject(handle,  
  23.        interruptReactionTime); 
  24.       try {System.Threading.Thread.Sleep(0);}  
  25.       catch(System.Threading.ThreadInterruptedException e) 
  26.       { 
  27.         if(res == 0) 
  28.         { //Rollback  
  29.           int previousCount; 
  30.           NTKernel.ReleaseSemaphore(handle,1,out previousCount); 
  31.         } 
  32.         throw e; 
  33.       } 
  34.       if(res == 0) 
  35.         return; 
  36.       if(res != 258) 
  37.         throw new SemaphoreFailedException(); 
  38.     } 
  39.   } 
  40.  
  41.   public void Acquire(TimeSpan timeout) 
  42.   { 
  43.     uint milliseconds = (uint)timeout.TotalMilliseconds; 
  44.     if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0) 
  45.       throw new SemaphoreFailedException();   
  46.   } 
  47.  
  48.   public void Release() 
  49.   { 
  50.     int previousCount; 
  51.     if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount)) 
  52.       throw new SemaphoreFailedException();   
  53.   } 
  54.  
  55.   #region IDisposable Member 
  56.   public void Dispose() 
  57.   { 
  58.     if(handle != 0) 
  59.     { 
  60.       if(NTKernel.CloseHandle(handle)) 
  61.         handle = 0; 
  62.     } 
  63.   } 
  64.   #endregion 

有一點(diǎn)很重要:win32中的信號(hào)量是可以命名的。這允許其他進(jìn)程通過(guò)名字來(lái)創(chuàng)建相應(yīng)信號(hào)量的句柄。為了讓阻塞線程可以中斷,我們使用了一個(gè)(不好)的替代方法:使用超時(shí)和 Sleep(0)。我們需要中斷來(lái)安全關(guān)閉線程。更好的做法是:確定沒(méi)有線程阻塞之后才釋放信號(hào)量,這樣程序才可以完全釋放資源并正確退出。

你可能也注意到了:跨線程和跨進(jìn)程的信號(hào)量都使用了相同的接口。所有相關(guān)的類(lèi)都使用了這種模式,以實(shí)現(xiàn)上面背景介紹中提到的封閉性。需要注意:出于性能考慮,你不應(yīng)該將跨進(jìn)程的信號(hào)量用到跨線程的場(chǎng)景,也不應(yīng)該將跨線程的實(shí)現(xiàn)用到單線程的場(chǎng)景。

#p#

3. 跨進(jìn)程共享內(nèi)存:內(nèi)存映射文件

我們已經(jīng)實(shí)現(xiàn)了跨線程和跨進(jìn)程的共享資源訪問(wèn)同步。但是傳遞/接收消息還需要共享資源。對(duì)于線程來(lái)說(shuō),只需要聲明一個(gè)類(lèi)成員變量就可以了。但是對(duì)于跨進(jìn)程來(lái)說(shuō),我們需要使用到 win32 API 提供的內(nèi)存映射文件(Memory Mapped Files,簡(jiǎn)稱MMF)。使用 MMF和使用 win32 信號(hào)量差不多。我們需要先調(diào)用 CreateFileMapping 方法來(lái)創(chuàng)建一個(gè)內(nèi)存映射文件的句柄:

 
 
 
 
  1. [DllImport("Kernel32.dll",EntryPoint="CreateFileMapping", 
  2.      SetLastError=true,CharSet=CharSet.Unicode)] 
  3. internal static extern IntPtr CreateFileMapping(uint hFile,  
  4.  SecurityAttributes lpAttributes, uint flProtect, 
  5.   uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); 
  6.      
  7. [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile", 
  8.  SetLastError=true,CharSet=CharSet.Unicode)] 
  9. internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,  
  10.   uint dwDesiredAccess, uint dwFileOffsetHigh, 
  11.   uint dwFileOffsetLow, uint dwNumberOfBytesToMap); 
  12.      
  13. [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile", 
  14.  SetLastError=true,CharSet=CharSet.Unicode)] 
  15. [return : MarshalAs( UnmanagedType.VariantBool )] 
  16. internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress); 
 
 
 
 
  1. public static MemoryMappedFile CreateFile(string name,  
  2.      FileAccess access, int size) 
  3.   if(size < 0) 
  4.     throw new ArgumentException("Size must not be negative","size"); 
  5.  
  6.   IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null, 
  7.    (uint)access,0,(uint)size,name); 
  8.   if(fileMapping == IntPtr.Zero) 
  9.     throw new MemoryMappingFailedException(); 
  10.  
  11.   return new MemoryMappedFile(fileMapping,size,access); 

我們希望直接使用 pagefile 中的虛擬文件,所以我們用 -1(0xFFFFFFFF) 來(lái)作為文件句柄來(lái)創(chuàng)建我們的內(nèi)存映射文件句柄。我們也指定了必填的文件大小,以及相應(yīng)的名稱。這樣其他進(jìn)程就可以通過(guò)這個(gè)名稱來(lái)同時(shí)訪問(wèn)該映射文件。創(chuàng)建了內(nèi)存映射文件后,我們就可以映射這個(gè)文件不同的部分(通過(guò)偏移量和字節(jié)大小來(lái)指定)到我們的進(jìn)程地址空間。我們通過(guò) MapViewOfFile 系統(tǒng)方法來(lái)指定:

 
 
 
 
  1. public MemoryMappedFileView CreateView(int offset, int size, 
  2.       MemoryMappedFileView.ViewAccess access) 
  3.   if(this.access == FileAccess.ReadOnly && access ==  
  4.     MemoryMappedFileView.ViewAccess.ReadWrite) 
  5.     throw new ArgumentException( 
  6.      "Only read access to views allowed on files without write access", 
  7.      "access"); 
  8.   if(offset < 0) 
  9.     throw new ArgumentException("Offset must not be negative","size"); 
  10.   if(size < 0) 
  11.     throw new ArgumentException("Size must not be negative","size"); 
  12.   IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping, 
  13.    (uint)access,0,(uint)offset,(uint)size); 
  14.   return new MemoryMappedFileView(mappedView,size,access); 

在不安全的代碼中,我們可以將返回的指針強(qiáng)制轉(zhuǎn)換成我們指定的類(lèi)型。盡管如此,我們不希望有不安全的代碼存在,所以我們使用 Marshal 類(lèi)來(lái)從中讀寫(xiě)我們的數(shù)據(jù)。偏移量參數(shù)是用來(lái)從哪里開(kāi)始讀寫(xiě)數(shù)據(jù),相對(duì)于指定的映射視圖的地址。

 
 
 
 
  1. public byte ReadByte(int offset) 
  2.   return Marshal.ReadByte(mappedView,offset); 
  3. public void WriteByte(byte data, int offset) 
  4.   Marshal.WriteByte(mappedView,offset,data); 
  5.  
  6. public int ReadInt32(int offset) 
  7.   return Marshal.ReadInt32(mappedView,offset); 
  8. public void WriteInt32(int data, int offset) 
  9.   Marshal.WriteInt32(mappedView,offset,data); 
  10.  
  11. public void ReadBytes(byte[] data, int offset) 
  12.   for(int i=0;i
  13.     data[i] = Marshal.ReadByte(mappedView,offset+i); 
  14. public void WriteBytes(byte[] data, int offset) 
  15.   for(int i=0;i
  16.     Marshal.WriteByte(mappedView,offset+i,data[i]); 

但是,我們希望讀寫(xiě)整個(gè)對(duì)象樹(shù)到文件中,所以我們需要支持自動(dòng)進(jìn)行序列化和反序列化的方法。

 
 
 
 
  1. public object ReadDeserialize(int offset, int length) 
  2.   byte[] binaryData = new byte[length]; 
  3.   ReadBytes(binaryData,offset); 
  4.   System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter 
  5.     = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); 
  6.   System.IO.MemoryStream ms = new System.IO.MemoryStream( 
  7.    binaryData,0,length,true,true); 
  8.   object data = formatter.Deserialize(ms); 
  9.   ms.Close(); 
  10.   return data; 
  11. public void WriteSerialize(object data, int offset, int length) 
  12. System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter 
  13.     = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); 
  14.   byte[] binaryData = new byte[length]; 
  15.   System.IO.MemoryStream ms = new System.IO.MemoryStream( 
  16.    binaryData,0,length,true,true); 
  17.   formatter.Serialize(ms,data); 
  18.   ms.Flush(); 
  19.   ms.Close(); 
  20.   WriteBytes(binaryData,offset); 

請(qǐng)注意:對(duì)象序列化之后的大小不應(yīng)該超過(guò)映射視圖的大小。序列化之后的大小總是比對(duì)象本身占用的內(nèi)存要大的。我沒(méi)有試過(guò)直接將對(duì)象內(nèi)存流綁定到映射視圖,那樣做應(yīng)該也可以,甚至可能帶來(lái)少量的性能提升。

#p#

4. 信箱:在線程/進(jìn)程間傳遞消息

這里的信箱與 Email 及 NT 中的郵件槽(Mailslots)無(wú)關(guān)。它是一個(gè)只能保留一個(gè)對(duì)象的安全共享內(nèi)存結(jié)構(gòu)。信箱的內(nèi)容通過(guò)一個(gè)屬性來(lái)讀寫(xiě)。如果信箱內(nèi)容為空,試圖讀取該信箱的線程將會(huì)阻塞,直到另一個(gè)線程往其中寫(xiě)內(nèi)容。如果信箱已經(jīng)有了內(nèi)容,當(dāng)一個(gè)線程試圖往其中寫(xiě)內(nèi)容時(shí)將被阻塞,直到另一個(gè)線程將信箱內(nèi)容讀取出去。信箱的內(nèi)容只能被讀取一次,它的引用在讀取后自動(dòng)被刪除?;谏厦娴拇a,我們已經(jīng)可以實(shí)現(xiàn)信箱了。

4.1 跨線程的信箱

我們可以使用兩個(gè)信號(hào)量來(lái)實(shí)現(xiàn)一個(gè)信箱:一個(gè)信號(hào)量在信箱內(nèi)容為空時(shí)觸發(fā),另一個(gè)在信箱有內(nèi)容時(shí)觸發(fā)。在讀取內(nèi)容之前,線程先等待信箱已經(jīng)填充了內(nèi)容,讀取之后觸發(fā)空信號(hào)量。在寫(xiě)入內(nèi)容之前,線程先等待信箱內(nèi)容清空,寫(xiě)入之后觸發(fā)滿信號(hào)量。注意:空信號(hào)量在一開(kāi)始時(shí)就被觸發(fā)了。

 
 
 
 
  1. public sealed class ThreadMailBox : IMailBox 
  2.   private object content; 
  3.   private ThreadSemaphore empty, full; 
  4.  
  5.   public ThreadMailBox() 
  6.   { 
  7.     empty = new ThreadSemaphore(1,1); 
  8.     full = new ThreadSemaphore(0,1); 
  9.   } 
  10.  
  11.   public object Content 
  12.   { 
  13.     get 
  14.     { 
  15.       full.Acquire(); 
  16.       object item = content; 
  17.       empty.Release(); 
  18.       return item; 
  19.     } 
  20.     set  
  21.     { 
  22.       empty.Acquire(); 
  23.       content = value; 
  24.       full.Release(); 
  25.     } 
  26.   } 

4.2  跨進(jìn)程信箱

跨進(jìn)程信箱與跨線程信箱的實(shí)現(xiàn)基本上一樣簡(jiǎn)單。不同的是我們使用兩個(gè)跨進(jìn)程的信號(hào)量,并且我們使用內(nèi)存映射文件來(lái)代替類(lèi)成員變量。由于序列化可能會(huì)失敗,我們使用了一小段異常處理來(lái)回滾信箱的狀態(tài)。失敗的原因有很多(無(wú)效句柄,拒絕訪問(wèn),文件大小問(wèn)題,Serializable屬性缺失等等)。

 
 
 
 
  1. public sealed class ProcessMailBox : IMailBox, IDisposable 
  2.   private MemoryMappedFile file; 
  3.   private MemoryMappedFileView view; 
  4.   private ProcessSemaphore empty, full; 
  5.  
  6.   public ProcessMailBox(string name,int size) 
  7.   { 
  8.     empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1); 
  9.     full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1); 
  10.     file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox", 
  11.       MemoryMappedFile.FileAccess.ReadWrite,size); 
  12.     view = file.CreateView(0,size, 
  13.      MemoryMappedFileView.ViewAccess.ReadWrite); 
  14.   } 
  15.  
  16.   public object Content 
  17.   { 
  18.     get 
  19.     { 
  20.       full.Acquire(); 
  21.       object item; 
  22.       try {item = view.ReadDeserialize();} 
  23.       catch(Exception e) 
  24.       {  //Rollback 
  25.         full.Release(); 
  26.         throw e; 
  27.       } 
  28.       empty.Release(); 
  29.       return item; 
  30.     } 
  31.  
  32.     set  
  33.     { 
  34.       empty.Acquire(); 
  35.       try {view.WriteSerialize(value);} 
  36.       catch(Exception e) 
  37.       {  //Rollback 
  38.         empty.Release(); 
  39.         throw e; 
  40.       } 
  41.       full.Release(); 
  42.     } 
  43.   } 
  44.  
  45.   #region IDisposable Member 
  46.   public void Dispose() 
  47.   { 
  48.     view.Dispose(); 
  49.     file.Dispose(); 
  50.     empty.Dispose(); 
  51.     full.Dispose(); 
  52.   } 
  53.   #endregion 

到這里我們已經(jīng)實(shí)現(xiàn)了跨進(jìn)程消息傳遞(IPC)所需要的組件。你可能需要再回頭本文開(kāi)頭的那個(gè)例子,看看 ProcessMailBox 應(yīng)該如何使用

#p#

5.通道:基于隊(duì)列的消息傳遞

信箱最大的限制是它們每次只能保存一個(gè)對(duì)象。如果一系列線程(使用同一個(gè)信箱)中的一個(gè)線程需要比較長(zhǎng)的時(shí)間來(lái)處理特定的命令,那么整個(gè)系列都會(huì)阻塞。通常我們會(huì)使用緩沖的消息通道來(lái)處理,這樣你可以在方便的時(shí)候從中讀取消息,而不會(huì)阻塞消息發(fā)送者。這種緩沖通過(guò)通道來(lái)實(shí)現(xiàn),這里的通道比信箱要復(fù)雜一些。同樣,我們將分別從線程和進(jìn)程級(jí)別來(lái)討論通道的實(shí)現(xiàn)。

5.1 可靠性

信箱和通道的另一個(gè)重要的不同是:通道擁有可靠性。例如:自動(dòng)將發(fā)送失敗(可能由于線程等待鎖的過(guò)程中被中斷)的消息轉(zhuǎn)存到一個(gè)內(nèi)置的容器中。這意味著處理通道的線程可以安全地停止,同時(shí)不會(huì)丟失隊(duì)列中的消息。這通過(guò)兩個(gè)抽象類(lèi)來(lái)實(shí)現(xiàn), ThreadReliability 和 ProcessReliability。每個(gè)通道的實(shí)現(xiàn)類(lèi)都繼承其中的一個(gè)類(lèi)。

5.2 跨線程的通道

跨線程的通道基于信箱來(lái)實(shí)現(xiàn),但是使用一個(gè)同步的隊(duì)列來(lái)作為消息緩沖而不是一個(gè)變量。得益于信號(hào)量,通道在空隊(duì)列時(shí)阻塞接收線程,在隊(duì)列滿時(shí)阻塞發(fā)送線程。這樣你就不會(huì)碰到由入隊(duì)/出隊(duì)引發(fā)的錯(cuò)誤。為了實(shí)現(xiàn)這個(gè)效果,我們用隊(duì)列大小來(lái)初始化空信號(hào)量,用0來(lái)初始化滿信號(hào)量。如果某個(gè)發(fā)送線程在等待入隊(duì)的時(shí)候被中斷,我們將消息復(fù)制到內(nèi)置容器中,并將異常往外面拋。在接收操作中,我們不需要做異常處理,因?yàn)榧词咕€程被中斷你也不會(huì)丟失任何消息。注意:線程只有在阻塞狀態(tài)才能被中斷,就像調(diào)用信號(hào)量的獲取操作(Aquire)方法時(shí)。

 
 
 
 
  1. public sealed class ThreadChannel : ThreadReliability, IChannel 
  2.   private Queue queue; 
  3.   private ThreadSemaphore empty, full; 
  4.  
  5.   public ThreadChannel(int size) 
  6.   { 
  7.     queue = Queue.Synchronized(new Queue(size)); 
  8.     empty = new ThreadSemaphore(size,size); 
  9.     full = new ThreadSemaphore(0,size); 
  10.   } 
  11.  
  12.   public void Send(object item) 
  13.   { 
  14.     try {empty.Acquire();} 
  15.     catch(System.Threading.ThreadInterruptedException e) 
  16.     { 
  17.       DumpItem(item); 
  18.       throw e; 
  19.     } 
  20.     queue.Enqueue(item); 
  21.     full.Release(); 
  22.   } 
  23.  
  24.   public void Send(object item, TimeSpan timeout) 
  25.   { 
  26.     try {empty.Acquire(timeout);} 
  27.     ... 
  28.   } 
  29.  
  30.   public object Receive() 
  31.   { 
  32.     full.Acquire(); 
  33.     object item = queue.Dequeue(); 
  34.     empty.Release(); 
  35.     return item; 
  36.   } 
  37.  
  38.   public object Receive(TimeSpan timeout) 
  39.   { 
  40.     full.Acquire(timeout); 
  41.     ... 
  42.   } 
  43.    
  44.   protected override void DumpStructure() 
  45.   { 
  46.     lock(queue.SyncRoot) 
  47.     { 
  48.       foreach(object item in queue) 
  49.         DumpItem(item); 
  50.       queue.Clear(); 
  51.     } 
  52.   } 

5.3 跨進(jìn)程通道

實(shí)現(xiàn)跨進(jìn)程通道有點(diǎn)麻煩,因?yàn)槟阈枰紫忍峁┮粋€(gè)跨進(jìn)程的緩沖區(qū)。一個(gè)可能的解決方法是使用跨進(jìn)程信箱并根據(jù)需要將接收/發(fā)送方法加入隊(duì)列。為了避免這種方案的幾個(gè)缺點(diǎn),我們將直接使用內(nèi)存映射文件來(lái)實(shí)現(xiàn)一個(gè)隊(duì)列。MemoryMappedArray 類(lèi)將內(nèi)存映射文件分成幾部分,可以直接使用數(shù)組索引來(lái)訪問(wèn)。 MemoryMappedQueue 類(lèi),為這個(gè)數(shù)組提供了一個(gè)經(jīng)典的環(huán)(更多細(xì)節(jié)請(qǐng)查看附件中的代碼)。為了支持直接以 byte/integer 類(lèi)型訪問(wèn)數(shù)據(jù)并同時(shí)支持二進(jìn)制序列化,調(diào)用方需要先調(diào)用入隊(duì)(Enqueue)/出隊(duì)(Dequeue)操作,然后根據(jù)需要使用讀寫(xiě)方法(隊(duì)列會(huì)自動(dòng)將數(shù)據(jù)放到正確的位置)。這兩個(gè)類(lèi)都不是線程和進(jìn)程安全的,所以我們需要使用跨進(jìn)程的信號(hào)量來(lái)模擬互斥量(也可以使用 win32 互斥量),以此實(shí)現(xiàn)相互間的互斥訪問(wèn)。除了這兩個(gè)類(lèi),跨進(jìn)程的通道基本上和跨線程信箱一樣。同樣,我們也需要在 Send() 中處理線程中斷及序列化可能失敗的問(wèn)題。

 
 
 
 
  1. public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable 
  2.   private MemoryMappedFile file; 
  3.   private MemoryMappedFileView view; 
  4.   private MemoryMappedQueue queue; 
  5.   private ProcessSemaphore empty, full, mutex; 
  6.  
  7.   public ProcessChannel( int size, string name, int maxBytesPerEntry) 
  8.   { 
  9.     int fileSize = 64+size*maxBytesPerEntry; 
  10.  
  11.     empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size); 
  12.     full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size); 
  13.     mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1); 
  14.     file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel", 
  15.       MemoryMappedFile.FileAccess.ReadWrite,fileSize); 
  16.     view = file.CreateView(0,fileSize, 
  17.      MemoryMappedFileView.ViewAccess.ReadWrite); 
  18.     queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0); 
  19.     if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry) 
  20.       throw new MemoryMappedArrayFailedException(); 
  21.   } 
  22.  
  23.   public void Send(object item) 
  24.   { 
  25.     try {empty.Acquire();} 
  26.     catch(System.Threading.ThreadInterruptedException e) 
  27.     { 
  28.       DumpItemSynchronized(item); 
  29.       throw e; 
  30.     } 
  31.     try {mutex.Acquire();} 
  32.     catch(System.Threading.ThreadInterruptedException e) 
  33.     { 
  34.       DumpItemSynchronized(item); 
  35.       empty.Release(); 
  36.       throw e; 
    網(wǎng)頁(yè)標(biāo)題:一個(gè)進(jìn)程間同步和通訊的C#框架
    URL分享:http://www.5511xx.com/article/dhopced.html