【转】.Net中的异步编程总结
一直以來很想梳理下我在開發(fā)過程中使用異步編程的心得和體會,但是由于我是APM異步編程模式的死忠,當(dāng)TAP模式和TPL模式出現(xiàn)的時候我并未真正的去接納這兩種模式,所以導(dǎo)致我一直沒有花太多心思去整理這兩部分異步編程模型。今天在CodeProject上面偶然間看到一篇關(guān)于異步編程的文章,概括總結(jié)的非常好,省去了我自己寫的麻煩,索性翻譯過來,以饗各位。
??? ??在阻塞和并行編程過程中,異步和多線程是非常重要的特性。異步編程可以涉及到多線程,也可以不用涉及。如果我們能夠把二者放到一起來講解的話,我們會理解的更快一些。所以今天在這里,我主要想講解的內(nèi)容是:
首先來說下異步編程
所謂的異步編程就是指 當(dāng)前的操作獨(dú)立于主操作流程。在C#編程中,我們總是從進(jìn)入Main方法開始 ,Main方法返回結(jié)束。在開始和結(jié)束之間的所有操作,都會挨個的執(zhí)行,下一個操作必須等到上一個操作完畢才開始執(zhí)行,我們看看如下代碼:
static void Main(string[] args){DoTaskOne();DoTaskTwo();}“DoTaskOne”方法必須在“DoTaskTwo”方法之前執(zhí)行。也就是發(fā)生了阻塞現(xiàn)象。
但是在異步編程中,方法的調(diào)用并不會阻塞主線程。當(dāng)方法被調(diào)用后,主線程仍會繼續(xù)執(zhí)行其他的任務(wù)。這種情形,一般都是使用Thread或者是Task來實現(xiàn)的。
在上面的場景中,如果我們讓方法“DoTaskOne”異步執(zhí)行,那么主線程將會立即返回,然后執(zhí)行“DoTaskTwo”方法。
在.NET中,我們可以利用Thread類或者異步模式來創(chuàng)建我們自己的線程來實現(xiàn)異步編程。在.NET中有三種不同的異步模型:
1.APM模型
2.EAP模型
但是,遺憾的是,上面的這兩種模型都不是微軟所推薦的,所以我們將不對其做具體的討論。如果你對這兩種模型感興趣,請移步:
https://msdn.microsoft.com/en-us/library/ms228963(v=vs.110).aspx
https://msdn.microsoft.com/en-us/library/ms228969(v=vs.110).aspx
3.TAP模型:這是微軟所推薦的,我們稍后將會詳細(xì)的進(jìn)行講解。
我們是否需要啟用多線程
如果我們在.NET 4.5版本中使用異步編程模型,絕大部分情況下我們無需手動創(chuàng)建線程。因為編譯器已經(jīng)為我們做了足夠多的工作了。
創(chuàng)建一個新線程不僅耗費(fèi)資源而且花費(fèi)時間,除非我們真的需要去控制一個多線程,否則是不需要去創(chuàng)建的。因為TAP模型和TPL模型已經(jīng)為異步編程和并行編程提供了絕好的支持。TAP模型和TPL模型使用Task來進(jìn)行操作,而Task類使用線程池中的線程來完成操作的。
Task可以在如下場景中運(yùn)行:
在我們使用Task過程中,我們無需擔(dān)心線程的創(chuàng)建或者是使用,因為.NET framework已經(jīng)為我們處理好了各種細(xì)節(jié)。但是如果我們需要控制線程的話,比如說以下的場景:
那么我們不得不使用Thread類來控制。
async和await關(guān)鍵字
.NET framework引入了兩個新的關(guān)鍵字“async”和“await”來執(zhí)行異步編程。當(dāng)在方法上使用await關(guān)鍵字的時候,我們需要同時使用async關(guān)鍵字。await關(guān)鍵字是在調(diào)用異步方法之前被使用的。它主要是使方法的后續(xù)執(zhí)行編程異步的,例如:
private asyncstatic void CallerWithAsync()// async modifier is used {// await is used before a method call. It suspends //execution of CallerWithAsync() method and control returs to the calling thread that can//perform other task.string result = await GetSomethingAsync();// this line would not be executed before GetSomethingAsync() //method completesConsole.WriteLine(result); }在這里,async關(guān)鍵字只能夠被那些返回Task或者是void的方法所使用。它不能被用于Main入口方法上。
我們不能夠在所有的方法上使用await關(guān)鍵字,因為一旦方法上有了await關(guān)鍵字,那么我們的返回類型就變成了“awaitable “類型。下面的幾種類型是” awaitable “的:
TAP模型
首先,我們需要一個能夠返回Task或者Task<T>的異步方法。我們可以通過如下方式來創(chuàng)建Task:
Task.Factory.StartNew還有一些高級應(yīng)用場景,請移步:http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx
下面的鏈接則展示了創(chuàng)建Task的幾種方式:?
http://dotnetcodr.com/2014/01/01/5-ways-to-start-a-task-in-net-c/
?
Task的創(chuàng)建和等待
我們接下來將會利用Task.Run<T>方法來創(chuàng)建我們自己的Task。它會將某個特殊的方法放入位于ThreadPool的執(zhí)行隊列中,然后會為這些方法返回一個task句柄。下面的步驟將會為你展示我們?nèi)绾螢橐粋€同步方法創(chuàng)建異步的Task的:
?
?
static Task<string> GreetingAsync(string name) {return Task.Run<string>(() =>{return Greeting(name);}); }?
?
現(xiàn)在我們可以使用await關(guān)鍵字來調(diào)用GreetingAsync方法
?
?
private asyncstatic void CallWithAsync(){//some other tasksstring result = awaitGreetingAsync("Bulbul");//We can add multiple ‘a(chǎn)wait’ in same ‘a(chǎn)sync’ method//string result1 = await GreetingAsync(“Ahmed”);//string result2 = await GreetingAsync(“Every Body”);Console.WriteLine(result);}?
?
當(dāng)“CallWithAsync”方法被調(diào)用的時候,它首先像正常的同步方法被調(diào)用,直到遇到await關(guān)鍵字。當(dāng)它遇到await關(guān)鍵字的時候,它會暫定方法的執(zhí)行,然后等待“GreetingAsync(“Bulbul”)”方法執(zhí)行完畢。在此期間,這個控制流程會返回到“CallWithAsync”方法上,然后調(diào)用者就可以做其他的任務(wù)了。
當(dāng)“GreetingAsync(“Bulbul”)”方法執(zhí)行完畢以后,“CallWithAsync”方法就會喚醒在await關(guān)鍵字后面的其他的方法,所以在這里它會繼續(xù)執(zhí)行“Console.WriteLine(result)”方法。
當(dāng)我們使用ContinueWith方法的時候,我們無需使用await關(guān)鍵字。因為編譯器會自動的將await關(guān)鍵字放到正確的位置上。
等待多個異步方法
讓我們先看下面的代碼:
private asyncstatic void CallWithAsync(){string result = awaitGreetingAsync("Bulbul");string result1 = awaitGreetingAsync("Ahmed");Console.WriteLine(result);Console.WriteLine(result1);}在這里,我們在順序的等待兩個方法被執(zhí)行。GreetingAsync("Ahmed")方法將會在GreetingAsync("Bulbul")執(zhí)行后,再執(zhí)行。但是,如果result和result1彼此不是獨(dú)立的,那么await關(guān)鍵字這樣用是不合適的。
在上面的場景中,我們其實是無需使用await關(guān)鍵字的。所以方法可以被改成如下的樣子:
private async static void MultipleAsyncMethodsWithCombinators(){Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);Console.WriteLine("Finished both methods.\n " +"Result 1: {0}\n Result 2: {1}", t1.Result, t2.Result);}在這里我們用了Task.WhenAll方法,它主要是等待所有的Task都完成工作后再觸發(fā)。Task類還有另一個方法:WhenAny,它主要是等待任何一個Task完成就會觸發(fā)。
異常處理
當(dāng)進(jìn)行錯誤處理的時候,我們不得不將await關(guān)鍵字放到try代碼塊中。
private asyncstatic void CallWithAsync() {try{string result = awaitGreetingAsync("Bulbul");}catch (Exception ex){Console.WriteLine(“handled {0}”, ex.Message);} }但是,如果我們有多個await關(guān)鍵字存在于try代碼快中,那么只有第一個錯誤被處理,因為第二個await是無法被執(zhí)行的。如果我們想要所有的方法都被執(zhí)行,甚至當(dāng)其中一個拋出異常的時候,我們不能使用await關(guān)鍵字來調(diào)用他們,我們需要使用Task.WhenAll方法來等待所有的task執(zhí)行。
private asyncstatic void CallWithAsync(){try{Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);}catch (Exception ex){Console.WriteLine(“handled {0}”, ex.Message);}}盡管所有的任務(wù)都會完成,但是我們可以從第一個task那里看到錯誤。雖然它不是第一個拋出錯誤的,但是它是列表中的第一個任務(wù)。
如果想要從所有的任務(wù)中獲取錯誤,那么有一個方式就是將其在try代碼塊外部進(jìn)行聲明。然后檢查Task方法的“IsFaulted”屬性。如果有錯誤拋出,那么其“IsFaulted”屬性為true。
示例如下:
static async void ShowAggregatedException(){Task taskResult = null;try {Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await (taskResult = Task.WhenAll(t1, t2));}catch (Exception ex){Console.WriteLine("handled {0}", ex.Message);foreach (var innerEx in taskResult.Exception.InnerExceptions){Console.WriteLine("inner exception {0}", nnerEx.Message);}}}Task的取消執(zhí)行
如果直接使用ThreadPool中的Thread,我們是無法進(jìn)行取消操作的。但是現(xiàn)在Task類提供了一種基于CancellationTokenSource類的方式來取消任務(wù)的執(zhí)行,可以按照如下步驟來進(jìn)行:
下面讓我們看看如何設(shè)置超時時間來取消任務(wù)的執(zhí)行:
static void Main(string[] args){CallWithAsync();Console.ReadKey(); }async static void CallWithAsync(){try{CancellationTokenSource source = new CancellationTokenSource();source.CancelAfter(TimeSpan.FromSeconds(1));var t1 = await GreetingAsync("Bulbul", source.Token);}catch (OperationCanceledException ex){Console.WriteLine(ex.Message);}}static Task<string> GreetingAsync(string name, CancellationToken token){return Task.Run<string>(() =>{return Greeting(name, token);});}static string Greeting(string name, CancellationToken token){Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name);}并行編程
在.net 4.5中,存在一個叫做“Parallel”的類,這個類可以進(jìn)行并行操作。當(dāng)然這種并行和那些充分利用cpu計算能力的Thread 是有差別的,簡單說起來,它有兩種表現(xiàn)方式:
1.數(shù)據(jù)并行。 如果我們有很多的數(shù)據(jù)需要計算,我們需要他們并行的進(jìn)行,那么我們可以使用For或者ForEach方法來進(jìn)行:
ParallelLoopResult result =Parallel.For(0, 100, async (int i) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);});如果我們在計算的過程中,想要中斷并行,我們可以把ParallelLoopState當(dāng)做參數(shù)傳遞進(jìn)去,我們就可以實現(xiàn)認(rèn)為和中斷這種循環(huán):
ParallelLoopResult result =Parallel.For(0, 100, async (int i, ParallelLoopState pls) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);if (i > 5) pls.Break();});需要注意的是,當(dāng)我們需要中斷循環(huán)的時候,由于其運(yùn)行在諸多個線程之上,如果線程數(shù)多于我們設(shè)定的中斷數(shù)時,上述的執(zhí)行可能就不太準(zhǔn)確。
?參考文章:http://www.codeproject.com/Articles/996857/Asynchronous-programming-and-Threading-in-Csharp-N?bmkres=exist#_articleTop
?
引用:https://www.cnblogs.com/scy251147/p/4597615.html
?
?
異步編程總結(jié)
? ???最近在為公司的分布式服務(wù)框架做支持異步調(diào)用的開發(fā),這種新特性的上線需要進(jìn)行各種嚴(yán)格的測試。在并發(fā)性能測試時,性能一直非常差,而且非常的不穩(wěn)定。經(jīng)過不斷的分析調(diào)優(yōu),發(fā)現(xiàn)Socket通信和多線程異步回調(diào)存在較為嚴(yán)重的性能問題。經(jīng)過多方優(yōu)化,性能終于達(dá)標(biāo)。下面是原版本、支持異步最初版本和優(yōu)化后版本的性能比較。差異還是非常巨大的。另外說明一下,總耗時是指10000次請求累計執(zhí)行時間。
???? 從上圖可以看到,支持異步的版本,在單線程模式下,性能的表現(xiàn)與老版本差異并不明顯,但是10線程下差異就非常巨大,而100線程的測試結(jié)果反而有所好轉(zhuǎn)。通過分析,兩個版本的性能差異如此巨大,主要是因為:
???? 在網(wǎng)絡(luò)通信方面,把原先半異步的模式調(diào)整為了SocketAsyncEventArgs?模式。下面是Socket通信的幾種模型的介紹和示例,總結(jié)一下,與大家分享。下次再與大家分享,并發(fā)下異步調(diào)用的性能優(yōu)化方案。
APM方式:?Asynchronous Programming Model
????異步編程模型是一種模式,該模式允許用更少的線程去做更多的操作,.NET Framework很多類也實現(xiàn)了該模式,同時我們也可以自定義類來實現(xiàn)該模式。NET Framework中的APM也稱為Begin/End模式。此種模式下,調(diào)用BeginXXX方法來啟動異步操作,然后返回一個IAsyncResult 對象。當(dāng)操作執(zhí)行完成后,系統(tǒng)會觸發(fā)IAsyncResult 對象的執(zhí)行。 具體可參考:?https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm
???? .net中的Socket異步模式也支持APM,與同步模式或Blocking模式相比,可以更好的利用網(wǎng)絡(luò)帶寬和系統(tǒng)資源編寫出具有更高性能的程序。參考具體代碼如下:
服務(wù)端監(jiān)聽:
????Socket serverSocket =?new?Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
//本機(jī)預(yù)使用的IP和端口
IPEndPoint serverIP =?new?IPEndPoint(IPAddress.Any, 9050);
//綁定服務(wù)端設(shè)置的IP
serverSocket.Bind(serverIP);
//設(shè)置監(jiān)聽個數(shù)
serverSocket.Listen(1);
//異步接收連接請求
serverSocket.BeginAccept(ar =>
{
??? base.communicateSocket = serverSocket.EndAccept(ar);
???AccessAciton();
?},?null);
客戶端連接:
var communicateSocket =?new?Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
???communicateSocket.Bind(new?IPEndPoint(IPAddress.Any, 9051));
?????????????
????????//服務(wù)器的IP和端口
????????IPEndPoint serverIP;
????????try
????????{
????????????serverIP =?new?IPEndPoint(IPAddress.Parse(IP), 9050);
????????}
????????catch
????????{
????????????throw?new?Exception(String.Format("{0}不是一個有效的IP地址!", IP));
????????}
?????????????
????????//客戶端只用來向指定的服務(wù)器發(fā)送信息,不需要綁定本機(jī)的IP和端口,不需要監(jiān)聽
????????try
????????{
?????????? communicateSocket.BeginConnect(serverIP, ar =>
????????????{
????????????????AccessAciton();
????????????},?null);
????????}
????????catch
????????{
????????????throw?new?Exception(string.Format("嘗試連接{0}不成功!", IP));
????????}
客戶端請求:
????if?(communicateSocket.Connected ==?false)
????????{
????????????throw?new?Exception("還沒有建立連接, 不能發(fā)送消息");
????????}
????????Byte[] msg = Encoding.UTF8.GetBytes(message);
????????communicateSocket.BeginSend(msg,0, msg.Length, SocketFlags.None,
????????????ar => {
?????????????????
????????????},?null);
?
服務(wù)端響應(yīng):
Byte[] msg =?new?byte[1024];
????????//異步的接受消息
????????communicateSocket.BeginReceive(msg, 0, msg.Length, SocketFlags.None,
????????????ar => {
????????????????//對方斷開連接時, 這里拋出Socket Exception??????????????
????????????????????communicateSocket.EndReceive(ar);
????????????????ReceiveAction(Encoding.UTF8.GetString(msg).Trim('\0',' '));
????????????????Receive(ReceiveAction);
????????????},?null);
?
??????注意:異步模式雖好,但是如果進(jìn)行大量異步套接字操作,是要付出很高代價的。針對每次操作,都必須創(chuàng)建一個IAsyncResult對象,而且該對象不能被重復(fù)使用。由于大量使用對象分配和垃圾收集,這會影響系統(tǒng)性能。如需要更好的理解APM模式,最了解EAP模式:Event-based Asynchronous Pattern:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-eap?。
?
TAP 方式:?Task-based Asynchronous Pattern
??????基于任務(wù)的異步模式,該模式主要使用System.Threading.Tasks.Task和Task<T>類來完成異步編程,相對于APM?模式來講,TAP使異步編程模式更加簡單(因為這里我們只需要關(guān)注Task這個類的使用),同時TAP也是微軟推薦使用的異步編程模式。APM與TAP的本質(zhì)區(qū)別,請參考我的一篇?dú)v史博客:http://www.cnblogs.com/vveiliang/p/7943003.html
???? TAP模式與APM模式是兩種異步模式的實現(xiàn),從性能上看沒有本質(zhì)的差別。TAP的資料可參考:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap?。參考具體代碼如下:
服務(wù)端:
publicclassStateContext
{
???// Client socket.???
???publicSocketWorkSocket =null;
???// Size of receive buffer.???
???publicconstintBufferSize = 1024;
???// Receive buffer.???
???publicbyte[] buffer =newbyte[BufferSize];
???// Received data string.???
???publicStringBuildersb =newStringBuilder(100);
}
publicclassAsynchronousSocketListener
{
???// Thread signal.???
???publicstaticManualResetEventreSetEvent =newManualResetEvent(false);
???publicAsynchronousSocketListener()
??? {
??? }
???publicstaticvoidStartListening()
??? {
???????// Data buffer for incoming data.???
???????byte[] bytes =newByte[1024];
???????// Establish the local endpoint for the socket.???
???????IPAddressipAddress =IPAddress.Parse("127.0.0.1");
???????IPEndPointlocalEndPoint =newIPEndPoint(ipAddress, 11000);
???????// Create a TCP/IP socket.???
???????Socketlistener =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
???????// Bind the socket to the local???
???????try
??????? {
??????????? listener.Bind(localEndPoint);
??????????? listener.Listen(100);
???????????while(true)
??????????? {
???????????????// Set the event to nonsignaled state.???
??????????????? reSetEvent.Reset();
???????????????// Start an asynchronous socket to listen for connections.???
???????????????Console.WriteLine("Waiting for a connection...");
??????????????? listener.BeginAccept(newAsyncCallback(AcceptCallback), listener);
???????????????// Wait until a connection is made before continuing.???
??????????????? reSetEvent.WaitOne();
??????????? }
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
???????Console.WriteLine("\nPress ENTER to continue...");
???????Console.Read();
??? }
???publicstaticvoidAcceptCallback(IAsyncResultar)
??? {
???????// Signal the main thread to continue.???
??????? reSetEvent.Set();
???????// Get the socket that handles the client request.???
???????Socketlistener = (Socket)ar.AsyncState;
???????Sockethandler = listener.EndAccept(ar);
???????// Create the state object.???
???????StateContextstate =newStateContext();
??????? state.WorkSocket = handler;
??????? handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);
??? }
???publicstaticvoidReadCallback(IAsyncResultar)
??? {
???????Stringcontent =String.Empty;
???????StateContextstate = (StateContext)ar.AsyncState;
???????Sockethandler = state.WorkSocket;
???????// Read data from the client socket.???
???????intbytesRead = handler.EndReceive(ar);
???????if(bytesRead > 0)
??????? {
???????????// There might be more data, so store the data received so far.???
??????????? state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
???????????// Check for end-of-file tag. If it is not there, read???
???????????// more data.???
??????????? content = state.sb.ToString();
???????????if(content.IndexOf("<EOF>") > -1)
??????????? {
???????????????Console.WriteLine("讀取 {0} bytes. \n 數(shù)據(jù): {1}", content.Length, content);
??????????????? Send(handler, content);
??????????? }
???????????else
??????????? {
??????????????? handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);
??????????? }
??????? }
??? }
???privatestaticvoidSend(Sockethandler,Stringdata)
??? {
???????byte[] byteData =Encoding.ASCII.GetBytes(data);
??????? handler.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), handler);
??? }
???privatestaticvoidSendCallback(IAsyncResultar)
??? {
???????try
??????? {
???????????Sockethandler = (Socket)ar.AsyncState;
???????????intbytesSent = handler.EndSend(ar);
???????????Console.WriteLine("發(fā)送 {0} bytes.", bytesSent);
??????????? handler.Shutdown(SocketShutdown.Both);
??????????? handler.Close();
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
??? }
???publicstaticintMain(String[] args)
??? {
??????? StartListening();
???????return0;
??? }
客戶端:
publicclassAsynchronousClient
{
???// The port number for the remote device.???
???privateconstintport = 11000;
???// ManualResetEvent instances signal completion.???
???privatestaticManualResetEventconnectResetEvent =newManualResetEvent(false);
???privatestaticManualResetEventsendResetEvent =newManualResetEvent(false);
???privatestaticManualResetEventreceiveResetEvent =newManualResetEvent(false);
???privatestaticStringresponse =String.Empty;
???privatestaticvoidStartClient()
??? {
???????try
??????? {
?????????
???????????IPAddressipAddress =IPAddress.Parse("127.0.0.1");
???????????IPEndPointremoteEP =newIPEndPoint(ipAddress, port);
???????????// Create a TCP/IP socket.???
???????????Socketclient =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
???????????// Connect to the remote endpoint.???
??????????? client.BeginConnect(remoteEP,newAsyncCallback(ConnectCallback), client);
??????????? connectResetEvent.WaitOne();
??????????? Send(client,"This is a test<EOF>");
??????????? sendResetEvent.WaitOne();
??????????? Receive(client);
??????????? receiveResetEvent.WaitOne();
???????????Console.WriteLine("Response received : {0}", response);
???????????// Release the socket.???
??????????? client.Shutdown(SocketShutdown.Both);
??????????? client.Close();
???????????Console.ReadLine();
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
??? }
???privatestaticvoidConnectCallback(IAsyncResultar)
??? {
???????try
??????? {
???????????Socketclient = (Socket)ar.AsyncState;
??????????? client.EndConnect(ar);
???????????Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());
??????????? connectResetEvent.Set();
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
??? }
???privatestaticvoidReceive(Socketclient)
??? {
???????try
??????? {
???????????StateContextstate =newStateContext();
??????????? state.WorkSocket = client;
??????????? client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
??? }
???privatestaticvoidReceiveCallback(IAsyncResultar)
??? {
???????try
??????? {
???????????StateContextstate = (StateContext)ar.AsyncState;
???????????Socketclient = state.WorkSocket;
???????????intbytesRead = client.EndReceive(ar);
???????????if(bytesRead > 0)
??????????? {
??????????????? state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
??????????????? client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);
??????????? }
???????????else
??????????? {
???????????????if(state.sb.Length > 1)
??????????????? {
??????????????????? response = state.sb.ToString();
??????????????? }
??????????????? receiveResetEvent.Set();
??????????? }
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
??? }
???privatestaticvoidSend(Socketclient,Stringdata)
??? {
???????byte[] byteData =Encoding.ASCII.GetBytes(data);
??????? client.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), client);
??? }
???privatestaticvoidSendCallback(IAsyncResultar)
??? {
???????try
??????? {
???????????Socketclient = (Socket)ar.AsyncState;
???????????intbytesSent = client.EndSend(ar);
???????????Console.WriteLine("Sent {0} bytes to server.", bytesSent);
??????????? sendResetEvent.Set();
??????? }
???????catch(Exceptione)
??????? {
???????????Console.WriteLine(e.ToString());
??????? }
??? }
???publicstaticintMain(String[] args)
??? {
??????? StartClient();
???????return0;
??? }
}
SAEA方式:?SocketAsyncEventArgs
????? APM模式、TAP模式雖然解決了Socket的并發(fā)問題,但是在大并發(fā)下還是有較大性能問題的。這主要是因為上述兩種模式都會生產(chǎn) IAsyncResult 等對象 ,而大量垃圾對象的回收會非常影響系統(tǒng)的性能。為此,微軟推出了?SocketAsyncEventArgs?。SocketAsyncEventArgs 是?.NET Framework 3.5?開始支持的一種支持高性能 Socket 通信的實現(xiàn)。SocketAsyncEventArgs 相比于 APM 方式的主要優(yōu)點(diǎn)可以描述如下,無需每次調(diào)用都生成 IAsyncResult 等對象,向原生 Socket 更靠近一些。這是官方的解釋:
The main feature of these enhancements is the?avoidance of the repeated allocation and synchronization of objects?during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.
????? SocketAsyncEventArgs主要為高性能網(wǎng)絡(luò)服務(wù)器應(yīng)用程序而設(shè)計,避免了在異步套接字 I/O 量非常大時,大量垃圾對象創(chuàng)建與回收。使用此類執(zhí)行異步套接字操作的模式包含以下步驟,具體說明可參考:https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx?。
??? 下面是封裝的一個組件代碼:
classBufferManager
??? {
???????intm_numBytes;????????????????// the total number of bytes controlled by the buffer pool
???????byte[] m_buffer;???????????????// the underlying byte array maintained by the Buffer Manager
???????Stack<int> m_freeIndexPool;????//
???????intm_currentIndex;
???????intm_bufferSize;
???????publicBufferManager(inttotalBytes,intbufferSize)
??????? {
??????????? m_numBytes = totalBytes;
??????????? m_currentIndex = 0;
??????????? m_bufferSize = bufferSize;
??????????? m_freeIndexPool =newStack<int>();
??????? }
???????// Allocates buffer space used by the buffer pool
???????publicvoidInitBuffer()
??????? {
???????????// create one big large buffer and divide that
???????????// out to each SocketAsyncEventArg object
??????????? m_buffer =newbyte[m_numBytes];
??????? }
???????// Assigns a buffer from the buffer pool to the
???????// specified SocketAsyncEventArgs object
???????//
???????// <returns>true if the buffer was successfully set, else false</returns>
???????publicboolSetBuffer(SocketAsyncEventArgsargs)
??????? {
???????????if(m_freeIndexPool.Count > 0)
??????????? {
??????????????? args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
??????????? }
???????????else
??????????? {
???????????????if((m_numBytes - m_bufferSize) < m_currentIndex)
??????????????? {
???????????????????returnfalse;
??????????????? }
??????????????? args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
??????????????? m_currentIndex += m_bufferSize;
??????????? }
???????????returntrue;
??????? }
???????// Removes the buffer from a SocketAsyncEventArg object.
???????// This frees the buffer back to the buffer pool
???????publicvoidFreeBuffer(SocketAsyncEventArgsargs)
??????? {
??????????? m_freeIndexPool.Push(args.Offset);
??????????? args.SetBuffer(null, 0, 0);
??????? }
??? }
???///<summary>
???///This class is used to communicate with a remote application over TCP/IP protocol.
???///</summary>
???classTcpCommunicationChannel
??? {
??????
???????#regionPrivate fields
???????///<summary>
???????///Size of the buffer that is used to receive bytes from TCP socket.
???????///</summary>
???????privateconstintReceiveBufferSize = 8 * 1024;//4KB
???????///<summary>
???????///This buffer is used to receive bytes
???????///</summary>
???????privatereadonlybyte[] _buffer;
???????///<summary>
???????///Socket object to send/reveice messages.
???????///</summary>
???????privatereadonlySocket_clientSocket;
???????///<summary>
???????///A flag to control thread's running
???????///</summary>
???????privatevolatilebool_running;
???????///<summary>
???????///This object is just used for thread synchronizing (locking).
???????///</summary>
???????privatereadonlyobject_syncLock;
???????privateBufferManagerreceiveBufferManager;
???????privateSocketAsyncEventArgsreceiveBuff =null;
???????#endregion
???????#regionConstructor
???????///<summary>
???????///Creates a new TcpCommunicationChannel object.
???????///</summary>
???????///<param name="clientSocket">A connected Socket object that is
???????///used to communicate over network</param>
???????publicTcpCommunicationChannel(SocketclientSocket)
??????? {
??????????? _clientSocket = clientSocket;
??????????? _clientSocket.Blocking =false;
??????????? _buffer =newbyte[ReceiveBufferSize];
??????????? _syncLock =newobject();
??????????? Init();
??????? }
???????privatevoidInit()
??????? {
???????????//初始化接收Socket緩存數(shù)據(jù)
??????????? receiveBufferManager =newBufferManager(ReceiveBufferSize*2, ReceiveBufferSize);
??????????? receiveBufferManager.InitBuffer();
??????????? receiveBuff =newSocketAsyncEventArgs();
??????????? receiveBuff.Completed += ReceiveIO_Completed;
??????????? receiveBufferManager.SetBuffer(receiveBuff);
???????????//初始化發(fā)送Socket緩存數(shù)據(jù)
??????? }
???????#endregion
???????#regionPublic methods
???????///<summary>
???????///Disconnects from remote application and closes channel.
???????///</summary>
???????publicvoidDisconnect()
??????? {
??????????? _running =false;
??????????? receiveBuff.Completed -= ReceiveIO_Completed;
??????????? receiveBuff.Dispose();
???????????if(_clientSocket.Connected)
??????????? {
??????????????? _clientSocket.Close();
??????????? }
??????????? _clientSocket.Dispose();
??????? }
???????#endregion
?????
???????publicvoidStartReceive()
??????? {
??????????? _running =true;
???????????boolresult = _clientSocket.ReceiveAsync(receiveBuff);
??????? }
???????privatevoidReceiveIO_Completed(objectsender,SocketAsyncEventArgse)
??????? {
???????????if(e.BytesTransferred > 0 && e.SocketError ==SocketError.Success && _clientSocket.Connected ==true&& e.LastOperation ==SocketAsyncOperation.Receive)
??????????? {
???????????????if(!_running)
??????????????? {
???????????????????return;
??????????????? }
???????????????//Get received bytes count
???????????????DateTimereceiveTime =DateTime.Now;
???????????????//Copy received bytes to a new byte array
???????????????varreceivedBytes =newbyte[e.BytesTransferred];
???????????????Array.Copy(e.Buffer, 0, receivedBytes, 0, e.BytesTransferred);
???????????????//處理消息....
???????????????if(_running)
??????????????? {
??????????????????? StartReceive();
??????????????? }
??????????? }
??????? }
???????///<summary>
???????///Sends a message to the remote application.
???????///</summary>
???????///<param name="message">Message to be sent</param>
???????publicvoidSendMessage(byte[] messageBytes)
??????? {
???????????//Send message
???????????if(_clientSocket.Connected)
??????????? {
???????????????SocketAsyncEventArgsdata =newSocketAsyncEventArgs();
??????????????? data.SocketFlags =SocketFlags.None;
??????????????? data.Completed += (s, e) =>
??????????????? {
??????????????????? e.Dispose();
??????????????? };
??????????????? data.SetBuffer(messageBytes, 0, messageBytes.Length);
???????????????//Console.WriteLine("發(fā)送:" + messageBytes.LongLength);
??????????????? _clientSocket.SendAsync(data);
??????????? }
??????? }
??? }
總結(jié)
以上是生活随笔為你收集整理的【转】.Net中的异步编程总结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 民生白条联名卡可以取现吗?手续费是多少?
- 下一篇: 手上有5000块钱随时用的钱,除了放支付