ACE入门
ACE編譯
1. 設置環境變量
2. 在 ACE_wrappers\ace 目錄下創建 config.h 文件,寫入:
#include "ace/config-win32.h"?
3.?如果你希望使用標準的?C++?頭文件(例如?iostream、cstdio?等)在?#include?"ace/config-win32.h"?前加入:?
#define?ACE_HAS_STANDARD_CPP_LIBRARY?1?
4.?如果你希望使用?MFC?庫,那么?config.h?中加入:?
#define?ACE_HAS_MFC?1?
如果你希望使用?MFC?靜態庫,那么加入:?
#define?ACE_USES_STATIC_MFC?
5.?如果你希望編譯靜態版本的?ACE?庫,那么在?config.h?中加入:?
#define?ACE_AS_STATIC_LIBS?
6.?如果你希望減少靜態庫的大小,可以禁止使用?inline,在?config.h?的?#include?"ace/config-win32.h"?前加入:?
#define?ACE_NO_INLINE?
?
ACE結構簡介
1)ACE OS adaptation 層:封裝了 OS API,對上層提供 OS 平臺無關的接口。
2)C++ wrapper facades 層:位于 OS adaptation 之上,提供了與之相似的功能,這些功能使用 C++ 的類封裝起來,而不是 C 語言 API。每個 wrapper facade 都包含一個或者一個以上的類。我們可以有選擇的繼承、聚合這些 wrapper facade。
3)框架層(Framework layer)
框架層在 C++ wrapper facades 層之上,它集成和擴充了 wrapper facade 類。
<1> 事件多路分離和分發框架
ACE Reactor 和 ACE Proactor 實現了 Reactor 模式和 Proactor 模式。
<2> 連接建立和服務初始化框架
ACE Acceptor-Connector 框架實現了 Acceptor-Connector 模式。
<3> 并發框架
ACE 提供了 Task 框架實現了并發模式。
<4> 服務配置框架
ACE 的服務配置框架實現了 Component Configurator 模式。
<5> 流框架
ACE 的流框架實現了 Pipes and Fiters 模式。
4)ACE 網絡組件層
組件(component)就是軟件系統中被封裝的一個部分,ACE 發行包中的組件用于提供以下功能:
<1> 演示 ACE
<2> 提供常見網絡服務的可復用實現。如提供日志記錄、時間同步等服務的可復用實現。
?
線程的創建與管理
一. 線程入口函數
所有線程必須從一個指定的函數開始執行,該函數稱為線程函數,它必須具有下列原型:
void* worker(void *arg) {}
該函數輸入一個void *型的參數,可以在創建線程時傳入。
注意:所有的線程啟動函數(方法)必須是靜態的或全局的(就如同直接使用OS線程API時所要求的一樣)。
二.線程基本操作
1.創建一個線程
一個進程的主線程是由操作系統自動生成,如果你要讓一個主線程創建額外的線程,可以通過ACE_Thread::spawn()實現,該函數一般的使用方式如下:
ACE_thread_t threadId; ACE_hthread_t threadHandle; ACE_Thread::spawn( (ACE_THR_FUNC)worker, //線程執行函數 NULL, //執行函數參數 THR_JOINABLE | THR_NEW_LWP, &threadId, &threadHandle );ACE_Thread::spawn((ACE_THR_FUNC)worker) 使用其默認參數,來創建一個worker的線程。
ACE_Thread::spawn_n函數來創建多個線程。
2.終止線程
在線程函數體中ACE_Thread::exit()調用即可終止線程執行。
3.設定線程的相對優先級
當一個線程被首次創建時,它的優先級等同于它所屬進程的優先級。一個線程的優先級是相對于其所屬的進程的優先級而言的??梢酝ㄟ^調用ACE_Thread::setprio函數改變線程的相對優先級,該函數的調用方式如下:
ACE_Thread::setprio(threadHandle,ACE_DEFAULT_THREAD_PRIORITY)
4.掛起及恢復線程
掛起線程可以通過來實現,它能暫停一個線程的執行,其調用方式如下ACE_Thread::suspend(threadHandle) 。
相應的,可以通過ACE_Thread::resume(threadHandle) 恢復被掛起的線程的執行。
5.等待線程結束
在主函數中調用ACE_Thread::join(threadHandle)可阻塞主函數,直道線程結束才能繼續執行。
6.停止線程
在主函數中調用ACE_Thread::cancel (threadHandle)可停止線程的執行(在Unix底下可以,而在windows下好像不起作用,有待檢驗)。
三.程序示例
下面例子演示了如何用ace創建一個線程。
#include "ace/Thread.h" #include "ace/Synch.h"#pragma comment(lib, "ACEd.lib")#include <iostream> using namespace std; void* worker(void *arg) { for(int i=0;i<10;i++){ACE_OS::sleep(1);cout<<endl<<"hello world"<<endl;} return NULL; } int main(int argc, char *argv[]) { ACE_thread_t threadId;ACE_hthread_t threadHandle;ACE_Thread::spawn((ACE_THR_FUNC)worker, //線程執行函數NULL, //執行函數參數THR_JOINABLE | THR_NEW_LWP,&threadId,&threadHandle);ACE_Thread::join(threadHandle); return 0; }在這個簡單的例子中,創建了1個工作者線程,執行程序中定義的worker()函數。然后阻塞主函數,待線程結束后退出程序。
ACE Lock類屬
鎖類屬包含的類包裝簡單的鎖定機制,比如互斥體、信號量、讀/寫互斥體和令牌等。這里我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進行一些簡單的說明。
1.互斥體 ACE_Thread_Mutex
互斥體用于保護共享的易變代碼,也就是全局或靜態數據。這樣的數據必須通過互斥體進行保護,以防止它們在多個線程同時訪問時損壞。??
#include "ace/Thread.h" #include "ace/Synch.h" #include <iostream> using namespace std; ACE_Thread_Mutex mutex; void* Thread1(void *arg) {mutex.acquire();ACE_OS::sleep(3);cout<<endl<<"hello thread1"<<endl;mutex.release(); return NULL; } void* Thread2(void *arg) {mutex.acquire();cout<<endl<<"hello thread2"<<endl;mutex.release(); return NULL; } int main(int argc, char *argv[]) { ACE_Thread::spawn((ACE_THR_FUNC)Thread1); //Thread2 比Thread1晚創建1秒鐘,故后嘗試獲取互斥體ACE_OS::sleep(1);ACE_Thread::spawn((ACE_THR_FUNC)Thread2); while(true)ACE_OS::sleep(10); return 0; }ACE_Thread_Mutex主要有兩個方法:
當線程要訪問共享資源時,首先調用acquire()方法獲取互斥體,從而獲取對改互斥體所保護的共享資源的唯一訪問權限,訪問結束時調用釋放互斥體,使得其它線程能獲取共享資源的訪問權限。
2.ACE Lock類屬簡介。
ACE Lock類屬列表如下:
ACE_Mutex
封裝互斥機制(根據平臺,可以是mutex_t、pthread_mutex_t等等)的包裝類,用于提供簡單而有效的機制來使對共享資源的訪問序列化。它與二元信號量(binary semaphore)的功能相類似。可被用于線程和進程間的互斥。
ACE_Thread_Mutex
可用于替換ACE_Mutex,專用于線程同步。
ACE_Process_Mutex
可用于替換ACE_Mutex,專用于進程同步。
ACE_NULL_Mutex
提供了ACE_Mutex接口的"無為"(do-nothing)實現,可在不需要同步時用作替換。
ACE_RW_Mutex
封裝讀者/作者鎖的包裝類。它們是分別為讀和寫進行獲取的鎖,在沒有作者在寫的時候,多個讀者可以同時進行讀取。
ACE_RW_Thread_Mutex
可用于替換ACE_RW_Mutex,專用于線程同步。
ACE_RW_Process_Mutex
可用于替換ACE_RW_Mutex,專用于進程同步。
ACE_Semaphore
這些類實現計數信號量,在有固定數量的線程可以同時訪問一個資源時很有用。在OS不提供這種同步機制的情況下,可通過互斥體來進行模擬。
ACE_Thread_Semaphore
應被用于替換ACE_Semaphore,專用于線程同步。
ACE_Process_Semaphore
應被用于替換ACE_Semaphore,專用于進程同步。
ACE_Token
提供"遞歸互斥體"(recursive mutex),也就是,當前持有某令牌的線程可以多次重新獲取它,而不會阻塞。而且,當令牌被釋放時,它確保下一個正阻塞并等待此令牌的線程就是下一個被放行的線程。
ACE_Null_Token
令牌接口的"無為"(do-nothing)實現,在你知道不會出現多個線程時使用。
ACE_Lock
定義鎖定接口的接口類。一個純虛類,如果使用的話,必須承受虛函數調用開銷。
ACE_Lock_Adapter
基于模板的適配器,允許將前面提到的任意一種鎖定機制適配到ACE_Lock接口。
可以簡單的分為以下幾類:
互斥鎖(通常稱為"互斥體"或"二元信號量")用于保護多線程控制并發訪問的共享資源的完整性。互斥體通過定義臨界區來序列化多線程控制的執行,在臨界區中每一時刻只有一個線程在執行它的代碼?;コ怏w簡單而高效(時間和空間)。
ACE線程庫提供了Mutex式的類(是一組互斥體對象,擁有類似的接口),他是一種簡單而高效的類型是"非遞歸"互斥體。非遞歸互斥體不允許當前擁有互斥體的線程在釋放它之前重新獲取它。否則,將會立即發生死鎖。遞歸互斥體在ACE Recursive_Thread_Mutex類中可移植地實現。
讀者/作者鎖與互斥體相類似。例如,獲取讀者/作者鎖的線程也必須釋放它。多個線程可同時獲取一個讀者/作者鎖用于讀,但只有一個線程可以獲取該鎖用于寫。當互斥體保護的資源用于讀遠比用于寫要頻繁時,讀者/作者互斥體有助于改善并發的執行。
ACE線程庫提供了一個叫作RW_Mutex的類,在C++封裝類中可移植地實現了讀者/作者鎖的語義。讀者/作者鎖將優先選擇權給作者。因而,如果有多個讀者和一個作者在鎖上等待,作者將會首先獲取它。
計數信號量
在概念上,計數信號量是可以原子地增減的整數。如果線程試圖減少一個值為零的信號量的值,它就會阻塞,直到另一個線程增加該信號量的值。
計數信號量用于追蹤共享程序狀態的變化。它們記錄某種特定事件的發生。因為信號量維護狀態,它們允許線程根據該狀態來作決定,即使事件是發生在過去。
信號量比互斥體效率要低,但是,它們要更為通用,因為它們無需被最初獲取它們的同一線程獲取和釋放。這使得它們能夠用于異步的執行上下文中(比如信號處理器)。ACE線程庫提供一個叫作Semaphore的類來可移植地在C++包裝類中實現信號量語義。
ACE Guard類屬
與C一級的互斥體API相比較,Mutex包裝為同步多線程控制提供了一種優雅的接口。但是,Mutex潛在地容易出錯,因為程序員有可能忘記調用release方法(當然,C級的互斥體API更容易出錯)。這可能由于程序員的疏忽或是C++異常的發生而發生,然而,其導致及其嚴重的后果--死鎖。
因此,為改善應用的健壯性,ACE同步機制有效地利用C++類構造器和析構器的語義來確保Mutex鎖被自動獲取和釋放。
ACE提供了一個稱為Guard、Write_Guard和Read_Guard的類族,確保在進入和退出C++代碼塊時分別自動獲取和釋放鎖。
Guard類是最基本的守衛機制,定義可以簡化如下(實際定義比這相對要復雜而完善一點):
template <class LOCK>
class Guard
{
public:
??? Guard (LOCK &l): lock_ (&l){ lock_.acquire (); }
??? ?Guard (void) {??? lock_.release (); }
private:
??? LOCK lock_;
}
Guard類的對象定義一"塊"代碼,在其上鎖被自動獲取,并在退出塊時自動釋放,即使是程序拋異常也能保證自動解鎖。這種機制也能為Mutex、RW_Mutex和Semaphore同步封裝工作。
對于讀寫鎖,由于加鎖接口不一樣,ace也提供了相應的Read_Guard和Write_Guard類,Read_Guard和Write_Guard類有著與Guard類相同的接口。但是,它們的acquire方法分別對鎖進行讀和寫。
缺省地, Guard類構造器將會阻塞程序,直到鎖被獲取。會有這樣的情況,程序必須使用非阻塞的acquire調用(例如,防止死鎖)。因此,可以傳給ACE Guard的構造器第二個參數(請參看原始代碼,而不是我這里的簡化代碼),指示它使用鎖的try_acquire方法,而不是acquire。隨后調用者可以使用Guard的locked方法來原子地測試實際上鎖是否已被獲取。
用Guard重寫上一節的Thread1方法如下(注釋了的部分是原有代碼):
void* Thread1(void *arg)
{
??? ACE_Guard<ACE_Thread_Mutex> guard(mutex);
//mutex.acquire();
??? ACE_OS::sleep(3);
??? cout<<endl<<"hello thread1"<<endl;
//mutex.release();
return NULL;
}
相比較而言,使用Guard更加簡潔,并且會自動解鎖,免除了一部分后顧之憂。
注意:
Guard是在Guard變量析構時解鎖,如果在同一函數中兩次對同一互斥體變量使用Guard要注意其對象生命周期,否則容易造成死鎖。
?
ACE Condition類屬
ACE Condition類屬(條件變量)提供風格與互斥體、讀者/作者鎖和計數信號量不同的鎖定機制。當持有鎖的線程在臨界區執行代碼時,這三種機制讓協作線程進行等待。相反,條件變量通常被一個線程用于使自己等待,直到一個涉及共享數據的條件表達式到達特定的狀態。當另外的協作線程指示共享數據的狀態已發生變化,調度器就喚醒一個在該條件變量上掛起的線程。于是新喚醒的線程重新對它的條件表達式進行求值,如果共享數據已到達合適狀態,就恢復處理。
ACE線程庫提供一個叫作Condition的類來可移植地在C++包裝類中實現條件變量語義。定義方式如下:
ACE_Thread_Mutex mutex;
ACE_Condition<ACE_Thread_Mutex> cond(mutex);
該對象有兩個常用方法。
向使用該條件變量的其它線程發送滿足條件信號。
查詢是否滿足條件,如果滿足,則繼續往下執行;如果不滿足條件,主線程就等待在此條件變量上。條件變量隨即自動釋放互斥體,并使主線程進入睡眠。
條件變量總是與互斥體一起使用。這是一種可如下描述的一般模式:
while( expression NOT TRUE ) wait on condition variable;
條件變量不是用于互斥,往往用于線程間的協作,下面例子演示了通過條件變量實現線程協作。
#include "ace/Thread.h" #include "ace/Synch.h" #include <iostream> using namespace std; ACE_Thread_Mutex mutex; ACE_Condition<ACE_Thread_Mutex> cond(mutex); void* worker(void *arg) { ACE_OS::sleep(2); //保證eater線程的cond.wait()在worker線程的cond.signal()先執行 mutex.acquire(); ACE_OS::sleep(1); cout<<endl<<"produce"<<endl; cond.signal(); mutex.release(); return NULL; } void* eater(void *arg) { mutex.acquire(); cond.wait(); cout<<endl<<"eat"<<endl; mutex.release(); return NULL; } int main(int argc, char *argv[]) { ACE_Thread::spawn((ACE_THR_FUNC)worker); ACE_OS::sleep(1); ACE_Thread::spawn((ACE_THR_FUNC)eater); while(true) ACE_OS::sleep(10); return 0; }這個例子中,首先創建了一個生產者線程worker和一個消費者線程eater,消費者線程執行比生產者快,兩個線程不加限制并發執行會導致先消費,后生產的情況(只是加互斥鎖也不能很好的解決,以為無法保證生產者一定先獲得互斥體)。所以這里通過條件變量的通知方式保證線程的順序執行:
消費者線程獲取互斥體,等待條件滿足(生產者生產了食品)。同時釋放互斥體,進入休眠狀態。
生產者獲取互斥體(雖然是消費者先獲取的互斥體,但消費者調用的wait函數會釋放消費者的互斥體),生產商品后,通過條件變量發送信號(調用signal函數)通知消費者生產完成,結束生產過程,釋放互斥體。
消費者收到信號后,重新獲取互斥體,完成消費過程。
使用條件變量的注意事項:
條件變量必須和互斥體一起使用,也就是說使用前必須加鎖(調用互斥體acquire函數),使用完后需釋放互斥體。
條件變量中的wait()和signal()成對使用的話,必須保證wait()函數在signal()之前執行,這樣才能保證wait()能收到條件滿足通知,不至于一直等待下去,形成死鎖(worker線程中的第一句話就是起的這個作用)。
?
ACE Synchronization類
這一類并發控制對象一般也叫做雜項并發類,這類對象一般用得不多,這里我只是對其作一些簡單的介紹。
1.Atomic_Op類
ACE_Atomic_Op類用于將同步透明地參數化進基本的算術運算中。
ACE_Atomic_Op是一種模板類,鎖定機制和需要參數化的類型被作為參數傳入其中,重載所有算術操作符,并確保在操作前獲取鎖,在操作后釋放它。運算本身被委托給通過模板傳入的的類。
使用ACE_Atomic_Op進行變量封裝時,對于那些用ACE_Atomic_Op封裝了的變量操作都變成了線程安全的,而并看不到顯式的加解鎖代碼,代碼變得更簡潔,優雅。
2.ACE中的柵欄(Barrier)
一組線程可以使用柵欄來進行共同的相互同步。組中的每個線程各自執行,直到到達柵欄,就阻塞在那里。在所有相關線程到達柵欄后,它們就全部繼續它們的執行。就是說,它們一個接一個地阻塞,等待其他的線程到達柵欄;一旦所有線程都到達了它們的執行路徑中的"柵欄點",它們就一起重新啟動。
在ACE中,柵欄在ACE_Barrier類中實現。在柵欄對象被實例化時,它將要等待的線程的數目會作為參數傳入。一旦到達執行路徑中的"柵欄點",每個線程都在柵欄對象上發出wait()調用。它們在這里阻塞,直到其他線程到達它們各自的"柵欄點",然后再一起繼續執行。當柵欄從相關線程那里接收了適當數目的wait()調用時,它就同時喚醒所有阻塞的線程。
舉個簡單的例子,運動員進行賽跑比賽時,雖然他們到達終點有先后順序,但會等到所有的運動員跑完比賽后才一起領獎。
?
面向對象的線程類ACE_Task
?
我們在前一章中使用ACE_Thread包裝時,你一定已經注意到了一些不夠"優雅"的地方。那一章中的大多數程序都被分解為函數、而不是對象。這是因為ACE_Thread包裝需要一個全局函數名、或是靜態方法作為參數。隨后該函數(靜態方法)就被用作所派生的線程的"啟動點"。這自然就使得程序員要為每個線程寫一個函數。如我們已經看到的,這可能會導致非面向對象的程序分解。
ACE_Task對常用線程處理進行了OO包裝,通過ACE_Task,能對線程進行更好的操作。
ACE_Task是ACE中的任務或主動對象“處理結構”的基類。ACE使用此類來實現主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向對象的線程。
每個任務都含有一或多個線程,以及一個底層消息隊列。各個任務通過消息隊列進行通信。至于消息隊列實現的內在細節程序員不必關注。發送任務用putq() 將消息插入到另一任務的消息隊列中,接收任務通過使用getq()將消息提取出來。這樣的體系結構大大簡化了多線程程序的編程模型。
其主要成員如下:
open():初始化資源
close():釋放資源
activate():啟動線程,可指定線程的數目
svc():線程的啟動位置
????? putq():放置消息到任務的消息隊列中
????? getq():從任務的消息隊列中取出消息
????? thr_count():返回任務中線程的數目
????? last_thread():返回任務中將線程計數器從1降為0的線程的ID
?
要創建任務,需要進行以下步驟:
open()方法應該包含所有專屬于任務的初始化代碼。其中可能包括諸如連接控制塊、鎖和內存這樣的資源。close()方法是相應的終止方法。
在主動對象實例化后,你必須通過調用activate()啟用它。要在主動對象中創建的線程的數目,以及其他一些參數,被傳遞給activate()方法。activate()方法會使svc()方法成為所有它生成的線程的啟動點。
如上面所提到的,在主動對象被啟用后,各個新線程在svc()方法中啟動。應用開發者必須在子類中定義此方法。
下面的例子演示怎樣去創建任務:
#include "ace/Task.h" #include "ace/OS.h" #include <iostream> using namespace std; class TaskThread: public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(void) { for(int i=0;i<10;i++) { ACE_OS::sleep(1); cout<<endl<<"hello thread1"<<endl; } return 0; } }; int main(int argc, char *argv[]) { TaskThread task; task.activate(); while(true) ACE_OS::sleep(10); return 0; }ACE_Task也封裝了常用線程操作,如暫停,恢復及停止等,是不是非常簡單和方便呢。
其實ACE_Task的使用還不僅僅是這些,通過它還可實現一種很常用的網絡編程模式--主動對象模式,其具體功能在后續的設計模式部分將作詳細的介紹。
ACE_Message_Queue
在Windows和Linux的config文件中都沒有定義"ACE_HAS_TIMED_MESSAGE_BLOCKS"這個宏,所以msg_deadline_time和msg_execution_time都不起任何作用.
ACE_Message_Queue_Factory這個工廠提供三個靜態函數分別用來創建靜態消息隊列和兩種類型的動態消息隊列。靜態消息隊列的消息也支持優先級,但是消息的優先級是靜態的,不需要通過動態計算而來。水位用來控制消息隊列中數據的大小,高水位(high_water_mark)用于控制消息隊列的上限,它用于控制生產者往里面放數據的量,如果消息隊列中數據量已經達到高水位,而用使用了鎖,既使“ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();”創建消息隊列,那么生產者將被阻塞。高水位很容易理解,但是低水位是用來做什么的呢?
只要消息隊列中還有數據消費者就不會被阻塞的,而當數據量超過高水位時,生產者會被阻塞,既然會被阻塞,那么它肯定需要被喚醒,那么什么時候由誰來喚醒生產者呢?這就是低水位的作用,消費者一直消費數據,當數據低于低水位時它就喚醒生產者。
下面的代碼很好的展示了靜態消息隊列的使用。
#include "ace/Message_Queue.h"#include "ace/Get_Opt.h"
#include "ace/OS.h"
#include <ace/Thread_Manager.h>
#include <ace/Synch.h>
//消息隊列指針
ACE_Message_Queue<ACE_MT_SYNCH>* mq;
const char S1[] = "C++";
const char S2[] = "Java";
const char S3[] = "PHP";
const char S4[] = "C#";
//四個消息指針
ACE_Message_Block* mb1, * mb2, * mb3, * mb4;
//生產者
static void* produce(void *arg)
{
static int loop = 1;
while(true)
{
ACE_OS::sleep(2);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer...\n"));
while(true)
{
if(loop == 1)
{
//將高水位設置為10, S1+S2的長度為3+4+2=9<10,因此可以將S3放進去
//但是再放入S4時生產者將會被阻塞
//需要注意的是水位的大小并不是消息的個數,而是消息隊列中消息里面的數據量之和
//如果也能以消息的個數作為高低水位的值就好了
mq->high_water_mark(10);
mq->enqueue_prio (mb1);
mq->enqueue_prio (mb2);
mq->enqueue_prio (mb3);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending!!\n"));
//因為消費者在睡眠6秒之后才會調用deactivate,因此生產者會在這兒阻塞幾秒鐘
//可以不斷地將msg_bytes打印出來觀察觀察
int ret = mq->enqueue_prio (mb4);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by deactivate, ret = %d!!\n", ret));
++loop;
}
if(loop == 2)
{
ACE_OS::sleep(6);
//將低水位設置為5,因為高水位仍然為10,當前的數據量又超過了10,
//所以下面的入隊操作仍會將生產者阻塞
//這樣消費者消費消息,當數據量小于5時,將喚醒生產者
//生產者在此處等待被消費者喚醒
mq->low_water_mark(5);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending again!!\n"));
mq->enqueue_prio (mb4);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by consumer!!\n"));
++loop;
}
}
}
return NULL;
}
//消費者
void* consume(void *arg)
{
static int loop = 1;
while(true)
{
ACE_OS::sleep(2);
ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer...\n"));
if(loop == 1)
{
//等待6秒,此時生產者和消費者都將被阻塞
ACE_OS::sleep(6);
//deactivate會喚醒所有的線程,將消息隊列設置為不可用
//以后所存取操作都會返回-1
//這個操作會喚醒生產者
mq->deactivate();
++loop;
}
if(loop == 2)
{
ACE_OS::sleep(2);
//將消息隊列的狀態設置成ACTIVATED
//消息又可以使用了
mq->activate();
++loop;
}
if(loop == 3)
{
ACE_OS::sleep(10);
//消費兩個消息之后,數據量就小于5了,低于低水位將喚醒生產者
ACE_Message_Block *mb;
mq->dequeue_head (mb);
mq->dequeue_head (mb);
ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer wake up producer!!\n"));
++loop;
}
}
return NULL;
}
int main(int argc, char* argv[])
{
mq = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();
int priority;
//使用隨機數作為消息的優先級
//數字越高,優先級越高
priority = ACE_OS::rand() % 100;
mb1 = new ACE_Message_Block(S1, sizeof S1, priority);
priority = ACE_OS::rand() % 100;
mb2 = new ACE_Message_Block(S2, sizeof S2, priority);
priority = ACE_OS::rand() % 100;
mb3 = new ACE_Message_Block(S3, sizeof S3, priority);
priority = ACE_OS::rand() % 100;
mb4 = new ACE_Message_Block(S4, sizeof S4, priority);
//將消息壓入隊列中,enqueue_prio根據消息的優先級將消息放到適當的位置上
//enqueue_head只是簡單地將數據存入隊列中,而不考慮消息的優先級
//使用enqueue_prio壓入消息后,可以簡單通過dequeue_head和dequeue_tail
//分別按優先級從高到低和從低到高取消息
//如果使用enqueue_head和enqueue_tail壓入消息
//則需要通過dequeue_prio來按照消息的優先級依次將消息出隊列
//沒有必要既使用enqueue_prio壓入消息,又實用dequeue_prio來取消息
mq->enqueue_prio (mb1);
mq->enqueue_prio (mb2);
mq->enqueue_prio (mb3);
mq->enqueue_prio (mb4);
//輸出靜態消息隊列的相關信息
//高低水位默認值均為16384
ACE_DEBUG((LM_DEBUG, "count : %d, bytes : %d, length : %d, high_water_mark : %d, low_water_mark : %d, status : %d\n",
mq->message_count(), mq->message_bytes(), mq->message_length(),
mq->high_water_mark(), mq->low_water_mark(),
mq->state()));
ACE_Message_Block *mb;
//使用next遍歷消息,遍歷的順序為高優先級到底優先級
ACE_DEBUG((LM_DEBUG, "===========next=============\n"));
//peek一下,并不彈出消息,類似Windows的PeekMessage
mq->peek_dequeue_head(mb);
do
{
ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
}while(mb = mb->next());
//使用迭代器遍歷消息隊列,遍歷的順序為高優先級到底優先級
ACE_DEBUG((LM_DEBUG, "=========iterator=============\n"));
ACE_Message_Queue<ACE_MT_SYNCH>::ITERATOR iterator (*mq);
for (ACE_Message_Block *entry = 0;
iterator.next (entry) != 0;
iterator.advance ())
{
ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", entry->rd_ptr(), entry->msg_priority()));
}
ACE_DEBUG((LM_DEBUG, "============dequeue_head==========\n"));
while(mq->dequeue_head (mb) != -1)
{
ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
//這里如果不判斷的話,消息隊列空時會導致主線程被阻塞
if(mq->is_empty())
break;
}
ACE_DEBUG((LM_DEBUG, "\n\n"));
//測試高低水位和隊列的state使用,進行測試之前mq隊列已空///
//產生一個生產者線程
ACE_Thread_Manager::instance()->spawn_n
(
1,
(ACE_THR_FUNC) produce
);
產生兩個消費者線程
ACE_Thread_Manager::instance()->spawn_n
(
1,
(ACE_THR_FUNC) consume
);
//掛起主線程
ACE_Thread_Manager::instance()->wait();
return 0;
}
?
?
ACE中TCP通信
Tcp通信過程一般為如下步驟:
服務器綁定端口,等待客戶端連接。
客戶端通過服務器的ip和服務器綁定的端口連接服務器。
服務器和客戶端通過網絡建立一條數據通路,通過這條數據通路進行數據交互。
常用API:
1. ACE_INET_Addr類。
ACE"地址"類ACE_Addr的子類,表示TCP/IP和UDP/IP的地址。它通常包含機器的ip和端口信息,通過它可以定位到所通信的進程。
定義方式:
ACE_INET_Addr addInfo(3000,"192.168.1.100");
常用方法:
get_host_name??? 獲取主機名
get_ip_address??? 獲取ip地址
get_port_number??? 獲取端口號
2. ACE_SOCK_Acceptor類。
服務期端使用,用于綁定端口和被動地接受連接。
常用方法:
open 綁定端口
accept建立和客戶段的連接
3. ACE_SOCK_Connector類。
客戶端使用,用于主動的建立和服務器的連接。
常用方法:
connect()??? 建立和服務期的連接。
4. ACE_SOCK_Stream類。
客戶端和服務器都使用,表示客戶段和服務器之間的數據通路。
常用方法:
send ()??? 發送數據
recv ()??? 接收數據
close()??? 關閉連接(實際上就是斷開了socket連接)。
代碼示例:
下面例子演示了如何如何用ACE創建TCP通信的Server端。
#include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/INET_Addr.h" #include "ace/OS.h"#include <string> #include <iostream> using namespace std;int main(int argc, char *argv[]) {ACE_INET_Addr port_to_listen(3000); //綁定的端口ACE_SOCK_Acceptor acceptor;if (acceptor.open (port_to_listen, 1) == -1) //綁定端口{cout<<endl<<"bind port fail"<<endl;return -1;}while(true){ACE_SOCK_Stream peer; //和客戶端的數據通路ACE_Time_Value timeout (10, 0);if (acceptor.accept (peer) != -1) //建立和客戶端的連接{cout<<endl<<endl<<"client connect. "<<endl;char buffer[1024];ssize_t bytes_received;ACE_INET_Addr raddr;peer.get_local_addr(raddr);cout<<endl<<"local port\t"<<raddr.get_host_name()<<"\t"<<raddr.get_port_number()<<endl;while ((bytes_received =peer.recv (buffer, sizeof(buffer))) != -1) //讀取客戶端發送的數據{peer.send(buffer, bytes_received); //對客戶端發數據}peer.close ();}}return 0; }這個例子實現的功能很簡單,服務器端綁定3000號端口,等待一個客戶端的連接,然后將從客戶端讀取的數據再次轉發給客戶端,也就是實現了一個EchoServer的功能。
相應的客戶端程序也比較簡單,代碼如下:
#include <ace/SOCK_Stream.h> #include <ace/SOCK_Connector.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std;int main(int argc, char *argv[]) {ACE_INET_Addr addr(3000,"127.0.0.1");ACE_SOCK_Connector connector; ACE_Time_Value timeout(5,0);ACE_SOCK_Stream peer;if(connector.connect(peer,addr,&timeout) != 0){cout<<"connection failed !"<<endl;return 1;}cout<<"conneced !"<<endl;string s="hello world";peer.send(s.c_str(),s.length()); //發送數據cout<<endl<<"send:\t"<<s<<endl;ssize_t bc=0; //接收的字節數char buf[1024];bc=peer.recv(buf,1024,&timeout); //接收數據if(bc>=0){buf[bc]='\0';cout<<endl<<"rev:\t"<<buf<<endl;}peer.close();return 0; }?
ACE中UDP通信
udp是一種無連接的協議,提供無連接不可靠的服務。
在ace中,通過ACE_SOCK_Dgram類提供udp通信服務,ACE_SOCK_Dgram和ACE_SOCK_Stream的API非常類似,一樣提供了send,recv及close等常用操作,這里就不再累述了。
udp通信時無需像tcp那樣建立連接和關閉連接,tcp編程時需要通過accept和connect來建立連接,而udp通信省略了這一步驟,相對來說編程更為簡單。
由于udp通信時無建立連接,服務器端不能像Tcp通信那樣在建立連接的時候就獲得客戶端的地址信息,故服務器端不能主動對客戶端發送信息(不知道客戶端的地址),只有等到收到客戶端發送的udp信息時才能確定客戶端的地址信息,從而進行通信。
udp通信過程如下:
下面代碼為EchoServer的udp版:
//server.cpp #include <ace/SOCK_Dgram.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std; int main(int argc, char *argv[]) { ACE_INET_Addr port_to_listen(3000); //綁定的端口 ACE_SOCK_Dgram peer(port_to_listen); //通信通道 char buf[100]; while(true) { ACE_INET_Addr remoteAddr; //所連接的遠程地址 int bc = peer.recv(buf,100,remoteAddr); //接收消息,獲取遠程地址信息 if( bc != -1) { string s(buf,bc); cout<<endl<<"rev:\t"<<s<<endl; } peer.send(buf,bc,remoteAddr); //和遠程地址通信 } return 0; }相應的客戶端程序如下:
//client.cpp #include <ace/SOCK_Dgram.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std; int main(int argc, char *argv[]) { ACE_INET_Addr remoteAddr(3000,"127.0.0.1"); //所連接的遠程地址 ACE_INET_Addr localAddr; //本地地址信息 ACE_SOCK_Dgram peer(localAddr); //通信通道 peer.send("hello",5,remoteAddr); //發送消息 char buf[100]; int bc = peer.recv(buf,100,remoteAddr); //接收消息 if( bc != -1) { string s(buf,bc); cout<<endl<<"rev:\t"<<s<<endl; } return 0; }和tcp編程相比,udp無需通過acceptor,connector來建立連接,故代碼相對tcp編程來說要簡單許多。另外,由于udp是一種無連接的通信方式,ACE_SOCK_Dgram的實例對象中無法保存遠端地址信息(保存了本地地址信息),故通信的時候需要加上遠端地址信息。
?
ACE主動對象模式
主動對象模式用于降低方法執行和方法調用之間的耦合。該模式描述了另外一種更為透明的任務間通信方法。
傳統上,所有的對象都是被動的代碼段,對象中的代碼是在對它發出方法調用的線程中執行的,當方法被調用時,調用線程將阻塞,直至調用結束。而主動對象卻不一樣。這些對象具有自己的命令執行線程,主動對象的方法將在自己的執行線程中執行,不會阻塞調用方法。
例如,設想對象"A"已在你的程序的main()函數中被實例化。當你的程序啟動時,OS創建一個線程,以從main()函數開始執行。如果你調用對象A的任何方法,該線程將"流過"那個方法,并執行其中的代碼。一旦執行完成,該線程返回調用該方法的點并繼續它的執行。但是,如果"A"是主動對象,事情就不是這樣了。在這種情況下,主線程不會被主動對象借用。相反,當"A"的方法被調用時,方法的執行發生在主動對象持有的線程中。另一種思考方法:如果調用的是被動對象的方法(常規對象),調用會阻塞(同步的);而另一方面,如果調用的是主動對象的方法,調用不會阻塞(異步的)。
由于主動對象的方法調用不會阻塞,這樣就提高了系統響應速度,在網絡編程中是大有用武之地的。
在這里我們將一個"Logger"(日志記錄器)對象為例來介紹如何將一個傳統對象改造為主動對象,從而提高系統響應速度。
Logger的功能是將一些系統事件的記錄在存儲器上以備查詢,由于Logger使用慢速的I/O系統來記錄發送給它的消息,因此對Logger的操作將會導致系統長時間的等待。
其功能代碼簡化如下:
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
void LogMsg(const string& msg)
??? {
??????? cout<<endl<<msg<<endl;
??????? ACE_OS::sleep(2);
??? }
};
為了實現記錄日志操作的主動執行,我們需要用命令模式將其封裝,從而使得記錄日志的方法能在合適的時間和地方主動執行,封裝方式如下:
class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg){this->log=plog;this->msg=msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger *log;string msg; };class Logger: public ACE_Task<ACE_MT_SYNCH> { public:void LogMsg(const string& msg){cout<<endl<<msg<<endl;ACE_OS::sleep(2);}LogMsgCmd *LogMsgActive(const string& msg){new LogMsgCmd(this,msg);} };?
?
這里對代碼功能做一下簡單的說明:
ACE_Method_Object是ACE提供的命令模式借口,命令接口調用函數為int call(),在這里通過它可以把每個操作日志的調用封裝為一個LogMsgCmd對象,這樣,當原來需要調用LogMsg的方法的地方只要調用LogMsgActive即可生成一個LogMsgCmd對象,由于調用LogMsgActive方法,只是對命令進行了封裝,并沒有進行日志操作,所以該方法會立即返回。然后再新開一個線程,將LogMsgCmd對象作為參數傳入,在該線程中執行LogMsgCmd對象的call方法,從而實現無阻塞調用。
然而,每次對一個LogMsg調用都開啟一個新線程,無疑是對資源的一種浪費,實際上我們往往將生成的LogMsgCmd對象插入一個命令隊列中,只新開一個命令執行線程依次執行命令隊列中的所有命令。并且,為了實現對象的封裝,命令隊列和命令執行線程往往也封裝到Logger對象中,代碼如下所示:
#include "ace/OS.h" #include "ace/Task.h" #include "ace/Method_Object.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h"#include <string> #include <iostream> using namespace std;class Logger: public ACE_Task<ACE_MT_SYNCH> { public:Logger(){this->activate();}int svc();void LogMsg(const string& msg);void LogMsgActive (const string& msg);private:ACE_Activation_Queue cmdQueue; //命令隊列 };class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg){this->log=plog;this->msg=msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger *log;string msg; };void Logger::LogMsg(const string& msg) {cout<<endl<<msg<<endl;ACE_OS::sleep(2); }//以主動的方式記錄日志 void Logger::LogMsgActive(const string& msg) {//生成命令對象,插入到命令隊列中cmdQueue.enqueue(new LogMsgCmd(this,msg)); }int Logger::svc() {while(true){//遍歷命令隊列,執行命令auto_ptr<ACE_Method_Object> mo(this->cmdQueue.dequeue ());if (mo->call () == -1)break;}return 0; }int main (int argc, ACE_TCHAR *argv[]) {Logger log;log. LogMsgActive ("hello");ACE_OS::sleep(1);log.LogMsgActive("abcd");while(true)ACE_OS::sleep(1);return 0; }在這里需要注意一下命令隊列ACE_Activation_Queue對象,它是線程安全的,使用方法比較簡單,這里我也不多介紹了。
主動對象的基本結構就是這樣,然而,由于主動對象是異步調用的,又引出了如下兩個新問題:
通過ACE_Future對象來解決上述兩個問題的方法如下:
- 首先創建ACE_Future對象用以保留返回值。
- 調用主動命令時將ACE_Future對象作為參數傳入,生成的命令對象中保存ACE_Future對象的指針。
- 命令執行線程執行完命令后,將返回值通過set()函數設置到ACE_Future對象中。
- 調用線程可以通過ACE_Future對象的ready()函數查詢該命令是否執行完成,如果命令執行完成,則可通過get()函數來獲取返回值。
使用的時候要注意一下ACE_Future對象的生命周期。
為了演示了如何獲取主動命令的執行狀態和結果,我將上篇文章中的代碼改動了一下,日志類記錄日志后,會將記錄的內容作為返回值返回,該返回值會通過ACE_Future對象返回,代碼如下
#include "ace/OS.h" #include "ace/Task.h" #include "ace/Method_Object.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h"#include "ace/Future.h"#include <string> #include <iostream> using namespace std;class Logger: public ACE_Task<ACE_MT_SYNCH> { public:Logger(){this->activate();}int svc();string LogMsg(const string& msg);void LogMsgActive (const string& msg,ACE_Future<string> *result);private:ACE_Activation_Queue cmdQueue; //命令隊列 };class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg,ACE_Future<string> *result){this->log=plog;this->msg=msg;this->result=result;}int call(){string reply = this->log->LogMsg(msg);result->set(reply);return 0;}private:ACE_Future<string> *result;Logger *log;string msg; };string Logger::LogMsg(const string& msg) {ACE_OS::sleep(2);cout<<endl<<msg<<endl;return msg; }//以主動的方式記錄日志 void Logger::LogMsgActive(const string& msg,ACE_Future<string> *result) {//生成命令對象,插入到命令隊列中cmdQueue.enqueue(new LogMsgCmd(this,msg,result)); }int Logger::svc() {while(true){//遍歷命令隊列,執行命令auto_ptr<ACE_Method_Object> mo(this->cmdQueue.dequeue ());if (mo->call () == -1)break;}return 0; }void get_info(ACE_Future<string> &fu) {string state = fu.ready()?"ready":"not ready";cout<<endl<<state<<endl;if(fu.ready()){string value;fu.get(value);cout<<"value:\t"<<value<<endl;} }int main (int argc, ACE_TCHAR *argv[]) {ACE_Future<string> result;Logger log;log.LogMsgActive ("hello",&result);while(true){get_info(result);if(result.ready())break;ACE_OS::sleep(1);}cout<<endl<<"cmd end"<<endl;while(true)ACE_OS::sleep(1);return 0; }這種查詢模式比較簡單有效,但存在一個問題:調用線程必須不斷輪詢ACE_Future對象以獲取返回值,這樣的效率比較低。可以通過觀察者模式解決這個問題:在ACE_Future對象上注冊一個觀察者,當ACE_Future對象的值發生改變(異步命令執行完成)時主動通知該觀察者,從而獲取返回值。
ACE中的觀察者模式可以通過ACE_Future_Observer來實現,使用方法如下:
通過觀察者模式,可以更有效,及時的獲取異步命令的返回值,但同時也增加了程序結構的復雜度并且難以調試,使用的時候應該根據需要選取合適的方式。
#include "ace/Future.h"#include <string> #include <iostream> using namespace std;class MyObserver:public ACE_Future_Observer<string> {virtual void update (const ACE_Future<string> &future){string value;future.get(value);cout<<endl<<"change:\t"<<value<<endl;} };int main(int argc, char *argv[]) {MyObserver obv;ACE_Future<string> fu;fu.attach(&obv);ACE_OS::sleep(3);fu.set("12345");while(true)ACE_OS::sleep(3);return 0; }?
ACE反應器(Reactor)模式
反應器(Reactor):用于事件多路分離和分派的體系結構模式
通常的,對一個文件描述符指定的文件或設備, 有兩種工作方式: 阻塞與非阻塞。所謂阻塞方式的意思是指, 當試圖對該文件描述符進行讀寫時, 如果當時沒有東西可讀,或者暫時不可寫, 程序就進入等待狀態, 直到有東西可讀或者可寫為止。而對于非阻塞狀態, 如果沒有東西可讀, 或者不可寫, 讀寫函數馬上返回, 而不會等待。
在前面的章節中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:當接收tcp數據時,如果遠端沒有數據可以讀,則會一直阻塞到讀到需要的數據為止。這種方式的傳輸和傳統的被動方法的調用類似,非常直觀,并且簡單有效,但是同樣也存在一個效率問題,如果你是開發一個面對著數千個連接的服務器程序,對每一個客戶端都采用阻塞的方式通信,如果存在某個非常耗時的讀寫操作時,其它的客戶端通信將無法響應,效率非常低下。
一種常用做法是:每建立一個Socket連接時,同時創建一個新線程對該Socket進行單獨通信(采用阻塞的方式通信)。這種方式具有很高的響應速度,并且控制起來也很簡單,在連接數較少的時候非常有效,但是如果對每一個連接都產生一個線程的無疑是對系統資源的一種浪費,如果連接數較多將會出現資源不足的情況。
另一種較高效的做法是:服務器端保存一個Socket連接列表,然后對這個列表進行輪詢,如果發現某個Socket端口上有數據可讀時(讀就緒),則調用該socket連接的相應讀操作;如果發現某個Socket端口上有數據可寫時(寫就緒),則調用該socket連接的相應寫操作;如果某個端口的Socket連接已經中斷,則調用相應的析構方法關閉該端口。這樣能充分利用服務器資源,效率得到了很大提高。
在Socket編程中就可以通過select等相關API實現這一方式。但直接用這些API控制起來比較麻煩,并且也難以控制和移植,在ACE中可以通過Reactor模式簡化這一開發過程。
反應器本質上提供一組更高級的編程抽象,簡化了事件驅動的分布式應用的設計和實現。除此而外,反應器還將若干不同種類的事件的多路分離集成到易于使用的API中。特別地,反應器對基于定時器的事件、信號事件、基于I/O端口監控的事件和用戶定義的通知進行統一地處理。
ACE中的反應器與若干內部和外部組件協同工作。其基本概念是反應器框架檢測事件的發生(通過在OS事件多路分離接口上進行偵聽),并發出對預登記事件處理器(event handler)對象中的方法的"回調"(callback)。該方法由應用開發者實現,其中含有應用處理此事件的特定代碼。
使用ACE的反應器,只需如下幾步:
創建事件處理器,以處理他所感興趣的某事件。
在反應器上登記,通知說他有興趣處理某事件,同時傳遞他想要用以處理此事件的事件處理器的指針給反應器。
隨后反應器框架將自動地:
在內部維護一些表,將不同的事件類型與事件處理器對象關聯起來。
在用戶已登記的某個事件發生時,反應器發出對處理器中相應方法的回調。
反應器模式在ACE中被實現為ACE_Reactor類,它提供反應器框架的功能接口。
如上面所提到的,反應器將事件處理器對象作為服務提供者使用。反應器內部記錄某個事件處理器的特定事件的相關回調方法。當這些事件發生時,反應器會創建這種事件和相應的事件處理器的關聯。
事件處理器就是需要通過輪詢發生事件改變的對象列表中的對象,如在上面的例子中就是連接的客戶端,每個客戶端都可以看成一個事件處理器。
就是反應器支持的事件,如Socket讀就緒,寫就緒。拿上面的例子來說,如果某個客戶端(事件處理器)在反應器中注冊了讀就緒事件,當客戶端給服務器發送一條消息的時候,就會觸發這個客戶端的數據可讀的回調函數。
在反應器框架中,所有應用特有的事件處理器都必須由ACE_Event_Handler的抽象接口類派生??梢酝ㄟ^重載相應的"handle_"方法實現相關的回調方法。
使用ACE_Reactor基本上有三個步驟:
下面我就以一個Socket客戶端的例子為例簡單的說明反應器的基本用法。
#include <ace/OS.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <string> #include <iostream> using namespace std;class MyClient:public ACE_Event_Handler { public:bool open(){ACE_SOCK_Connector connector;ACE_INET_Addr addr(3000,"127.0.0.1");ACE_Time_Value timeout(5,0);if(connector.connect(peer,addr,&timeout) != 0){cout<<endl<<"connecetd fail";return false;}ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);cout<<endl<<"connecetd ";return true;}ACE_HANDLE get_handle(void) const{return peer.get_handle();}int handle_input (ACE_HANDLE fd){int rev=0;ACE_Time_Value timeout(5,0);if((rev=peer.recv(buffer,1000,&timeout))>0){buffer[rev]='\0';cout<<endl<<"rev:\t"<<buffer<<endl;}return 3;}private:ACE_SOCK_Stream peer;char buffer[1024]; };int main(int argc, char *argv[]) {MyClient client;client.open();while(true){ACE_Reactor::instance()->handle_events(); }return 0; }在這個例子中,客戶端連接上服務器后,通過ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注冊了一個讀就緒的回調函數,當服務器端給客戶端發消息的時候,會自動觸發handle_input()函數,將接收到的信息打印出來。
在Socket編程中,常見的事件就是"讀就緒","寫就緒",通過對這兩個事件的捕獲分發,可以實現Socket中的異步操作。
Socket編程中的事件處理器
在前面我們已經介紹過,在ACE反應器框架中,任何都必須派生自ACE_Event_Handler類,并通過重載其相應會調事件處理函數來實現相應的回調處理的。在Socket編程中,我們通常需要重載的函數有
當I/O句柄(比如UNIX中的文件描述符)上的輸入可用時,反應器自動回調該方法。
當I/O設備的輸出隊列有可用空間時,反應器自動回調該方法。
當事件處理器中的事件從Reactor中移除的時候調用。
此外,為了使Reactor能通過I/O句柄找到對應的事件處理器,還必須重載其get_handle()方法以使得Reactor建立起I/O句柄和事件處理器的關聯。
使用Reactor框架。
下面我們將以一個客戶端的程序為例,介紹如何在Socket編程中使用Reactor框架。
一.建立一個客戶端對象(事件處理器)。
客戶端對象就是一個事件處理器,其聲明如下:
class Client:public ACE_Event_Handler { public: ACE_HANDLE get_handle(void) const; int handle_input (ACE_HANDLE fd); int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); ACE_SOCK_Stream& Peer(); private: ACE_SOCK_Stream peer; };在Client端中我只關心"讀就緒"事件,故只重載了handle_input函數(大多數應用下只需要重載handle_input函數)。另外,在客戶端還保存了一個ACE_SOCK_Stream的peer對象用來進行Socket通信,同時封裝了一個Peer()函數返回它的引用。
二.重載相應回調處理函數
ACE_SOCK_Stream& Client::Peer() { return peer; } ACE_HANDLE Client::get_handle(void) const { return peer.get_handle(); } int Client::handle_input (ACE_HANDLE fd) { int rev=0; if((rev = peer.recv(buffer,1000))>0) { buffer[rev]='\0'; cout<<endl<<"rev:\t"<<buffer<<endl; return 0; } else //Socket連接發生錯誤,返回-1,在Reactor中注銷事件,觸發handle_close函數 { return -1; } } int Client::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { cout<<endl<<"connecetd closed"; return ACE_Event_Handler::handle_close(handle,close_mask); }?
三.在Reactor中注冊事件
首先讓我們來看看相應的main函數的代碼:
int main(int argc, char *argv[]) {Client client;ACE_SOCK_Connector connector;ACE_INET_Addr addr(3000,"127.0.0.1");ACE_Time_Value timeout(5,0);if(connector.connect(client.Peer(),addr,&timeout) != 0){cout<<endl<<"connecetd fail";return 0;}ACE_Reactor::instance()->register_handler(&client,ACE_Event_Handler::READ_MASK);while(true) { ACE_Reactor::instance()->handle_events(); }return 0; }在這里可以看到,使用Reactor框架后,依然首先通過ACE_SOCK_Connector的connect函數來建立連接。建立連接后,可以通過ACE_Reactor::instance()->register_handler函數來實現Reactor的注冊,實現I/O事件和Client對象的handle_input方法相關聯,它的第一個參數是事件處理器的地址,第二個參數是事件類型,由于這里只關心讀就緒事件,故注冊的事件類型是ACE_Event_Handler::READ_MASK。
四.啟動Reactor事件循環
通過如上設置后,我們就可以通過ACE_Reactor::instance()->handle_events()啟動Reactor循環了,這樣,每當服務器端有數據發送給客戶端時,當客戶端的數據就緒時,就回觸發Client對象的handle_input函數,將接收的數據打印出來。
通常的做法是,將Reactor事件循環作為一個單獨的線程來處理,這樣就不會阻塞main函數。
五.注銷Reactor事件
Reactor事件的注銷一般有兩種方式,顯式和隱式,下面將分別給予介紹。
當Reactor捕獲事件后,會觸發相應的"handle_"處理函數,當"handle_"處理函數返回值大于或等于0時,表示處理事件成功,當返回值小于0時,表示處理事件失敗,這時Reactor會自動注銷該句柄所注冊的所有事件,并觸發handle_close函數,以執行相應的資源清理工作。
在本例中,當handle_input函數里的recv函數接收到0個數時,說明socket發生錯誤(大多為Socket連接中斷),此時返回-1,則系統自動注銷相應事件。
調用Reactor對象的remove_handler方法移除,它有兩個參數,第一個是所注冊的事件反應器對象,第二個是所要注銷的事件。
在這個示例程序里,連接方只有一個Socket連接,Reactor的優勢并沒有體現出來,但在一些網絡管理系統里,連接方需要對多個需要管理的設備(服務器端)進行連接,在這種情況下使用Reactor模式,只需要多開一個Reactor事件循環線程就能實現事件多路分發復用,并且不會阻塞,通過面向對象的回調方式管理,使用起來非常方便。
Reactor框架的另外一個常用的地方就是服務器端,一般是一個服務器端對應多個客戶端,這樣用Reactor模式能大幅提高并發能力,這方面的編程方法將在下一章給與介紹。
?
在服務器端使用Reactor框架
使用Reactor框架的服務器端結構如下:
服務器端注冊兩種事件處理器,ClientAcceptor和ClientService ,ClientService類負責和客戶端的通信,每一個ClientService對象對應一個客戶端的Socket連接。 ClientAcceptor專門負責被動接受客戶端的連接,并創建ClientService對象。這樣,在一個N個Socket連接的服務器程序中,將存在1個ClientAcceptor對象和N個ClientService對象。
整個服務器端流程如下:
首先創建一個ClientAcceptor對象,該對象在Reactor上注冊ACCEPT_MASK事件,Reactor將自動在監聽端口建立Socket監聽。
如果有對該端口的Socket連接時,Reactor將自動回調handle_input方法,ClientAcceptor重載此方法,并創建一個ClientService對象,用于處理和Client的通信。
ClientService對象根據服務器的具體功能實現,其處理過程和客戶端程序類似,注冊相應的回調事件并分發即可。
代碼如下:
#include <ace/OS.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> class ClientService : public ACE_Event_Handler { public: ACE_SOCK_Stream &peer (void) { return this->sock_; } int open (void) { //注冊讀就緒回調函數 return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { //一個簡單的EchoServer,將客戶端的信息返回 int rev = peer().recv(buf,100); if(rev<=0) return -1; peer().send(buf,rev); return 0; } // 釋放相應資源 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, mask); this->sock_.close (); delete this; //socket出錯時,將自動刪除該客戶端,釋放相應資源 return 0; } protected: char buf[100]; ACE_SOCK_Stream sock_; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);} int open (const ACE_INET_Addr &listen_addr) { if (this->acceptor_.open (listen_addr, 1) == -1) { ACE_OS::printf("open port fail"); return -1; } //注冊接受連接回調事件 return this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->acceptor_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { ClientService *client = new ClientService(); auto_ptr<ClientService> p (client); if (this->acceptor_.accept (client->peer ()) == -1) { ACE_OS::printf("accept client fail"); return -1; } p.release (); client->reactor (this->reactor ()); if (client->open () == -1) client->handle_close (ACE_INVALID_HANDLE, 0); return 0; } virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, m); this->acceptor_.close (); } return 0; } protected: ACE_SOCK_Acceptor acceptor_; }; int main(int argc, char *argv[]) { ACE_INET_Addr addr(3000,"192.168.1.142"); ClientAcceptor server; server.reactor(ACE_Reactor::instance()); server.open(addr); while(true) { ACE_Reactor::instance()->handle_events(); } return 0; }代碼功能比較簡單,需要注意以下幾點:
這里注冊事件的方式和前面的文章中方式不一樣,是通過ACE_Event_Handler類的reactor()方法設置和獲取reactor的指針,比較直觀和方便。前面的文章是通過ACE_Reactor::instance()來獲取的一個單體reactor的指針。
定時器的實現
通過Reactor機制,還可以很容易的實現定時器的功能,使用方式如下。
編寫一個事件反應器,重載handle_timeout()方法,該方法是定時器的觸發時間到時,會自動觸發該方法。
通過Reactor的schedule_timer()方法注冊定時器。
啟動reacotr的handle_events()事件分發循環。
當不想使用定時器時,可以通過Reactor的cancel_timer()方法注銷定時器。
下面的代碼簡單的實現了一個定時器,并具有基本的開啟,關閉功能。
#include <ace/OS.h> #include <ace/Reactor.h> class MyTimerHandler : public ACE_Event_Handler { private: int inteval; //執行時間間隔 int delay; //延遲執行時間 int timerid; public: MyTimerHandler(int delay,int inteval) { this->delay=delay; this->inteval=inteval; } int open() //注冊定時器 { ACE_Time_Value delaytime(inteval); ACE_Time_Value intevaltime(inteval); timerid = reactor()->schedule_timer(this, 0, //傳遞handle_timeout給的參數 delaytime, intevaltime); return timerid; } int close() //取消定時器 { return reactor()->cancel_timer(timerid); } //定時器回調函數 int handle_timeout (const ACE_Time_Value ¤t_time, const void * = 0) { time_t epoch = ((timespec_t)current_time).tv_sec; ACE_DEBUG ((LM_INFO, ACE_TEXT ("handle_timeout: %s\n"), ACE_OS::ctime (&epoch))); return 0; } }; int main(int argc, char *argv[]) { MyTimerHandler * timer = new MyTimerHandler (3,5); timer->reactor(ACE_Reactor::instance()); timer->open(); for(int i=0;i<2;i++) //觸發次handle_timeout事件 { ACE_OS::printf("\n%d\n",i); ACE_Reactor::instance()->handle_events(); } timer->close(); ACE_OS::printf("cancel timer"); while(true) ACE_Reactor::instance()->handle_events(); return 0; }代碼功能比較簡單,這里就不多做介紹了。
轉載于:https://www.cnblogs.com/dubingsky/archive/2009/07/22/1528292.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
- 上一篇: Web站点下的Web Service读取
- 下一篇: 得到计算机名