生活随笔
收集整理的這篇文章主要介紹了
一个Linux下C线程池的实现(转)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.線程池基本原理
? 在傳統服務器結構中, 常是 有一個總的 監聽線程監聽有沒有新的用戶連接服務器, 每當有一個新的 用戶進入, 服務器就開啟一個新的線程用戶處理這 個用戶的數據包。這個線程只服務于這個用戶 , 當 用戶與服務器端關閉連接以后, 服務器端銷毀這個線程。然而頻繁地開辟與銷毀線程極大地占用了系統的資源。而且在大量用戶的情況下, 系統為了開辟和銷毀線程將浪費大量的時間和資源。線程池提供了一個解決外部大量用戶與服務器有限資源的矛盾, 線程池和傳統的一個用戶對應一 個線程的處理方法不同, 它的基本思想就是在程序 開始時就在內存中開辟一些線程, 線程的數目是 固定的,他們獨自形成一個類, 屏蔽了對外的操作, 而服務器只需要將數據包交給線程池就可以了。當有新的客戶請求到達時 , 不是新創建一個線程為其服務 , 而是從“池子”中選擇一個空閑的線程為新的客戶請求服務 ,服務完畢后 , 線程進入空閑線程池中。如果沒有線程空閑 的 話, 就 將 數 據 包 暫 時 積 累 , 等 待 線 程 池 內 有 線 程空閑以后再進行處理。通過對多個任務重用已經存在的線程對象 , 降低了對線程對象創建和銷毀的開銷。當客戶請求 時 , 線程對象 已 經 存 在 , 可 以 提 高 請 求 的響應時間 , 從而整體地提高了系統服務的表現。
??一般來說實現一個線程池主要包括以下幾個組成部分:
1)線程管理器:用于創建并管理線程池。
2)工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處于空閑狀態,一般不占用CPU,占用較小的內存空間。
3)任務接口:每個任務必須實現的接口,當線程池的任務隊列中有可執行任務時,被空閑的工作線程調去執行(線程的閑與忙是通過互斥量實現的,跟前面文章中的設置標志位差不多),把任務抽象出來形成接口,可以做到線程池與具體的任務無關。
4)任務隊列:用來存放沒有處理的任務,提供一種緩沖機制,實現這種結構有好幾種方法,常用的是隊列,主要運用先進先出原理,另外一種是鏈表之類的數據結構,可以動態的為它分配內存空間,應用中比較靈活,下文中就是用到的鏈表。
下面的不在贅述百度《線程池技術在并發服務器中的應用》寫的非常詳細!
轉自:http://blog.csdn.net/zouxinfox/article/details/3560891
? 什么時候需要創建線程池呢?簡單的說,如果一個應用需要頻繁的創建和銷毀線程,而任務執行的時間又非常短,這樣線程創建和銷毀的帶來的開銷就不容忽視,這時也是線程池該出場的機會了。如果線程創建和銷毀時間相比任務執行時間可以忽略不計,則沒有必要使用線程池了。 ?? ?下面是Linux系統下用C語言創建的一個線程池。線程池會維護一個任務鏈表(每個CThread_worker結構就是一個任務)。 ?? ?pool_init()函數預先創建好max_thread_num個線程,每個線程執thread_routine ()函數。該函數中
while ?(pool->cur_queue_size?==?0) { ??????pthread_cond_wait?(&(pool->queue_ready),&(pool->queue_lock)); }
表示如果任務鏈表中沒有任務,則該線程出于阻塞等待狀態。否則從隊列中取出任務并執行。
?? ?
?? ?pool_add_worker()函數向線程池的任務鏈表中加入一個任務,加入后通過調用pthread_cond_signal (&(pool->queue_ready))喚醒一個出于阻塞狀態的線程(如果有的話)。
?? ?
?? ?pool_destroy ()函數用于銷毀線程池,線程池任務鏈表中的任務不會再被執行,但是正在運行的線程會一直把任務運行完后再退出。
? ??
[cpp] ?view plain
?copy #include?<stdio.h> ?? #include?<stdlib.h> ?? #include?<unistd.h> ?? #include?<sys/types.h> ?? #include?<pthread.h> ?? #include?<assert.h> ?? ?? ? ? ? ?? typedef ? struct ?worker?? {?? ?????? ????void ?*(*process)?( void ?*arg);?? ????void ?*arg; ?? ????struct ?worker?*next;?? ?? }?CThread_worker;?? ?? ?? ?? ?? typedef ? struct ?? {?? ????pthread_mutex_t?queue_lock;?? ????pthread_cond_t?queue_ready;?? ?? ?????? ????CThread_worker?*queue_head;?? ?? ?????? ????int ?shutdown;?? ????pthread_t?*threadid;?? ?????? ????int ?max_thread_num;?? ?????? ????int ?cur_queue_size;?? ?? }?CThread_pool;?? ?? ?? ?? int ?pool_add_worker?( void ?*(*process)?( void ?*arg),? void ?*arg);?? void ?*thread_routine?( void ?*arg);?? ?? ?? ?? static ?CThread_pool?*pool?=?NULL;?? void ?? pool_init?(int ?max_thread_num)?? {?? ????pool?=?(CThread_pool?*)?malloc?(sizeof ?(CThread_pool));?? ?? ????pthread_mutex_init?(&(pool->queue_lock),?NULL);?? ????pthread_cond_init?(&(pool->queue_ready),?NULL);?? ?? ????pool->queue_head?=?NULL;?? ?? ????pool->max_thread_num?=?max_thread_num;?? ????pool->cur_queue_size?=?0;?? ?? ????pool->shutdown?=?0;?? ?? ????pool->threadid?=?(pthread_t?*)?malloc?(max_thread_num?*?sizeof ?(pthread_t));?? ????int ?i?=?0;?? ????for ?(i?=?0;?i?<?max_thread_num;?i++)?? ????{??? ????????pthread_create?(&(pool->threadid[i]),?NULL,?thread_routine,NULL);?? ????}?? }?? ?? ?? ?? ?? int ?? pool_add_worker?(void ?*(*process)?( void ?*arg),? void ?*arg)?? {?? ?????? ????CThread_worker?*newworker?=?(CThread_worker?*)?malloc?(sizeof ?(CThread_worker));?? ????newworker->process?=?process;?? ????newworker->arg?=?arg;?? ????newworker->next?=?NULL;?? ?? ????pthread_mutex_lock?(&(pool->queue_lock));?? ?????? ????CThread_worker?*member?=?pool->queue_head;?? ????if ?(member?!=?NULL)?? ????{?? ????????while ?(member->next?!=?NULL)?? ????????????member?=?member->next;?? ????????member->next?=?newworker;?? ????}?? ????else ?? ????{?? ????????pool->queue_head?=?newworker;?? ????}?? ?? ????assert?(pool->queue_head?!=?NULL);?? ?? ????pool->cur_queue_size++;?? ????pthread_mutex_unlock?(&(pool->queue_lock));?? ????? ?? ????pthread_cond_signal?(&(pool->queue_ready));?? ????return ?0;?? }?? ?? ?? ?? ? ?? int ?? pool_destroy?()?? {?? ????if ?(pool->shutdown)?? ????????return ?-1; ?? ????pool->shutdown?=?1;?? ?? ?????? ????pthread_cond_broadcast?(&(pool->queue_ready));?? ?? ?????? ????int ?i;?? ????for ?(i?=?0;?i?<?pool->max_thread_num;?i++)?? ????????pthread_join?(pool->threadid[i],?NULL);?? ????free?(pool->threadid);?? ?? ?????? ????CThread_worker?*head?=?NULL;?? ????while ?(pool->queue_head?!=?NULL)?? ????{?? ????????head?=?pool->queue_head;?? ????????pool->queue_head?=?pool->queue_head->next;?? ????????free?(head);?? ????}?? ?????? ????pthread_mutex_destroy(&(pool->queue_lock));?? ????pthread_cond_destroy(&(pool->queue_ready));?? ?????? ????free?(pool);?? ?????? ????pool=NULL;?? ????return ?0;?? }?? ?? ?? ?? void ?*?? thread_routine?(void ?*arg)?? {?? ????printf?("starting?thread?0x%x\n" ,?pthread_self?());?? ????while ?(1)?? ????{?? ????????pthread_mutex_lock?(&(pool->queue_lock));?? ????????? ?? ????????while ?(pool->cur_queue_size?==?0?&&?!pool->shutdown)?? ????????{?? ????????????printf?("thread?0x%x?is?waiting\n" ,?pthread_self?());?? ????????????pthread_cond_wait?(&(pool->queue_ready),?&(pool->queue_lock));?? ????????}?? ?? ?????????? ????????if ?(pool->shutdown)?? ????????{?? ?????????????? ????????????pthread_mutex_unlock?(&(pool->queue_lock));?? ????????????printf?("thread?0x%x?will?exit\n" ,?pthread_self?());?? ????????????pthread_exit?(NULL);?? ????????}?? ?? ????????printf?("thread?0x%x?is?starting?to?work\n" ,?pthread_self?());?? ?? ?????????? ????????assert?(pool->cur_queue_size?!=?0);?? ????????assert?(pool->queue_head?!=?NULL);?? ?????????? ?????????? ????????pool->cur_queue_size--;?? ????????CThread_worker?*worker?=?pool->queue_head;?? ????????pool->queue_head?=?worker->next;?? ????????pthread_mutex_unlock?(&(pool->queue_lock));?? ?? ?????????? ????????(*(worker->process))?(worker->arg);?? ????????free?(worker);?? ????????worker?=?NULL;?? ????}?? ?????? ????pthread_exit?(NULL);?? }?? ?? ?? ?? void ?*?? myprocess?(void ?*arg)?? {?? ????printf?("threadid?is?0x%x,?working?on?task?%d\n" ,?pthread_self?(),*( int ?*)?arg);?? ????sleep?(1);?? ????return ?NULL;?? }?? ?? int ?? main?(int ?argc,? char ?**argv)?? {?? ????pool_init?(3);?? ?????? ?????? ????int ?*workingnum?=?( int ?*)?malloc?( sizeof ?( int )?*?10);?? ????int ?i;?? ????for ?(i?=?0;?i?<?10;?i++)?? ????{?? ????????workingnum[i]?=?i;?? ????????pool_add_worker?(myprocess,?&workingnum[i]);?? ????}?? ?????? ????sleep?(5);?? ?????? ????pool_destroy?();?? ?? ????free?(workingnum);?? ????return ?0;?? }??
將上述所有代碼放入threadpool.c文件中,
在Linux輸入編譯命令
$ gcc -o threadpool threadpool.c -lpthread
以下是運行結果
starting thread 0xb7df6b90
thread 0xb7df6b90 is waiting
starting thread 0xb75f5b90
thread 0xb75f5b90 is waiting
starting thread 0xb6df4b90
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 0
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 1
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 2
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 3
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 4
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 5
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 6
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 7
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 8
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 9
thread 0xb75f5b90 is waiting
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is waiting
thread 0xb75f5b90 will exit
thread 0xb6df4b90 will exit
thread 0xb7df6b90 will exit
總結
以上是生活随笔 為你收集整理的一个Linux下C线程池的实现(转) 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。