生活随笔
收集整理的這篇文章主要介紹了
Linux C++ 实现线程池
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
http://blog.csdn.net/qq_25425023/article/details/53914609
線程池中的線程,在任務隊列為空的時候,等待任務的到來,任務隊列中有任務時,則依次獲取任務來執行,任務隊列需要同步。
? Linux線程同步有多種方法:互斥量、信號量、條件變量等。
? 下面是根據互斥量、信號量、條件變量封裝的三個類。
? 線程池中用到了互斥量和信號量。
?
[cpp]?view plain
?copy #ifndef?_LOCKER_H_?? #define?_LOCKER_H_?? ?? #include?<pthread.h>?? #include?<stdio.h>?? #include?<semaphore.h>?? ?? ?? class?sem_locker?? {?? private:?? ????sem_t?m_sem;?? ?? public:?? ?????? ????sem_locker()?? ????{?? ????if(sem_init(&m_sem,?0,?0)?!=?0)?? ????????printf("sem?init?error\n");?? ????}?? ?????? ????~sem_locker()?? ????{?? ????sem_destroy(&m_sem);?? ????}?? ?? ?????? ????bool?wait()?? ????{?? ????return?sem_wait(&m_sem)?==?0;?? ????}?? ?????? ????bool?add()?? ????{?? ????return?sem_post(&m_sem)?==?0;?? ????}?? };?? ?? ?? ?? class?mutex_locker?? {?? private:?? ????pthread_mutex_t?m_mutex;?? ?? public:?? ????mutex_locker()?? ????{?? ????????if(pthread_mutex_init(&m_mutex,?NULL)?!=?0)?? ????????printf("mutex?init?error!");?? ????}?? ????~mutex_locker()?? ????{?? ????pthread_mutex_destroy(&m_mutex);?? ????}?? ?? ????bool?mutex_lock()???? ????{?? ????return?pthread_mutex_lock(&m_mutex)?==?0;?? ????}?? ????bool?mutex_unlock()????? ????{?? ????return?pthread_mutex_unlock(&m_mutex)?==?0;?? ????}?? };?? ?? ?? class?cond_locker?? {?? private:?? ????pthread_mutex_t?m_mutex;?? ????pthread_cond_t?m_cond;?? ?? public:?? ?????? ????cond_locker()?? ????{?? ????if(pthread_mutex_init(&m_mutex,?NULL)?!=?0)?? ????????printf("mutex?init?error");?? ????if(pthread_cond_init(&m_cond,?NULL)?!=?0)?? ????{????? ????????pthread_mutex_destroy(&m_mutex);?? ????????printf("cond?init?error");?? ????}?? ????}?? ?????? ????~cond_locker()?? ????{?? ????pthread_mutex_destroy(&m_mutex);?? ????pthread_cond_destroy(&m_cond);?? ????}?? ?????? ????bool?wait()?? ????{?? ????int?ans?=?0;?? ????pthread_mutex_lock(&m_mutex);?? ????ans?=?pthread_cond_wait(&m_cond,?&m_mutex);?? ????pthread_mutex_unlock(&m_mutex);?? ????return?ans?==?0;?? ????}?? ?????? ????bool?signal()?? ????{?? ????return?pthread_cond_signal(&m_cond)?==?0;?? ????}?? ?? };?? ?? #endif?? 下面的是線程池類,是一個模版類:
[cpp]?view plain
?copy #ifndef?_PTHREAD_POOL_?? #define?_PTHREAD_POOL_?? ?? #include?"locker.h"?? #include?<list>?? #include?<stdio.h>?? #include?<exception>?? #include?<errno.h>?? #include?<pthread.h>?? #include?<iostream>?? ?? template<class?T>?? class?threadpool?? {?? private:?? ????int?thread_number;???? ????int?max_task_number;???? ????pthread_t?*all_threads;????? ????std::list<T?*>?task_queue;??? ????mutex_locker?queue_mutex_locker;???? ????sem_locker?queue_sem_locker;????? ????bool?is_stop;??? public:?? ????threadpool(int?thread_num?=?20,?int?max_task_num?=?30);?? ????~threadpool();?? ????bool?append_task(T?*task);?? ????void?start();?? ????void?stop();?? private:?? ?????? ????static?void?*worker(void?*arg);?? ????void?run();?? };?? ?? template?<class?T>?? threadpool<T>::threadpool(int?thread_num,?int?max_task_num):?? ????thread_number(thread_num),?max_task_number(max_task_num),?? ????is_stop(false),?all_threads(NULL)?? {?? ????if((thread_num?<=?0)?||?max_task_num?<=?0)?? ????printf("threadpool?can't?init?because?thread_number?=?0"?? ????????"?or?max_task_number?=?0");?? ?? ????all_threads?=?new?pthread_t[thread_number];?? ????if(!all_threads)?? ????????printf("can't?init?threadpool?because?thread?array?can't?new");?? }?? ?? template?<class?T>?? threadpool<T>::~threadpool()?? {?? ????delete?[]all_threads;?? ????is_stop?=?true;?? }?? ?? template?<class?T>?? void?threadpool<T>::stop()?? {?? ????is_stop?=?true;?? ?????? }?? ?? template?<class?T>?? void?threadpool<T>::start()?? {?? ????for(int?i?=?0;?i?<?thread_number;?++i)?? ????{?? ????printf("create?the?%dth?pthread\n",?i);?? ????if(pthread_create(all_threads?+?i,?NULL,?worker,?this)?!=?0)?? ????{?? ????????delete?[]all_threads;?? ????????throw?std::exception();?? ????}?? ????if(pthread_detach(all_threads[i]))?? ????{?? ????????delete?[]all_threads;?? ????????throw?std::exception();?? ????}?? ????}?? }?? ?? template?<class?T>?? bool?threadpool<T>::append_task(T?*task)?? {????? ????queue_mutex_locker.mutex_lock();?? ?????? ????if(task_queue.size()?>?max_task_number)?? ????{?? ????queue_mutex_locker.mutex_unlock();?? ????return?false;?? ????}?? ?????? ????task_queue.push_back(task);?? ????queue_mutex_locker.mutex_unlock();?? ?????? ????queue_sem_locker.add();?? ????return?true;?? }?? ?? template?<class?T>?? void?*threadpool<T>::worker(void?*arg)?? {?? ????threadpool?*pool?=?(threadpool?*)arg;?? ????pool->run();?? ????return?pool;?? }?? ?? template?<class?T>?? void?threadpool<T>::run()?? {?? ????while(!is_stop)?? ????{????? ????queue_sem_locker.wait();?????? ????if(errno?==?EINTR)?? ????{?? ????????printf("errno");?? ????????continue;?? ????}?? ?????? ????queue_mutex_locker.mutex_lock();?? ?????? ????if(task_queue.empty())?? ????{?? ????????queue_mutex_locker.mutex_unlock();?? ????????continue;?? ????}?? ?????? ????T?*task?=?task_queue.front();?? ????task_queue.pop_front();?? ????queue_mutex_locker.mutex_unlock();?? ????if(!task)?? ????????continue;?? ?? ????task->doit();???? ????}?? ?????? ????printf("close?%ld\n",?(unsigned?long)pthread_self());?? }?? ?? #endif??
以上參考《Linux高性能服務器編程》
寫個程序對線程池進行測試:
[cpp]?view plain
?copy #include?<stdio.h>?? #include?<iostream>?? #include?<unistd.h>?? ?? #include?"thread_pool.h"?? ?? class?task?? {?? private:?? ????int?number;?? ?? public:?? ????task(int?num)?:?number(num)?? ????{?? ????}?? ????~task()?? ????{?? ????}?? ?? ????void?doit()?? ????{?? ????printf("this?is?the?%dth?task\n",?number);?? ????}?? };?? ?? int?main()?? {?? ????task?*ta;?? ????threadpool<task>?pool(10,?15);?? ?? ????for(int?i?=?0;?i?<?20;?++i)?? ????{?? ????ta?=?new?task(i);?? ?? ????pool.append_task(ta);?? ????}?? ????pool.start();?? ????sleep(10);?? ????printf("close?the?thread?pool\n");?? ????pool.stop();?? ????pause();?? ????return?0;?? }?? 經測試,線程池可以正常使用。
下一篇博客,使用線程池來實現回射服務器,測試可以達到多大的并發量。
總結
以上是生活随笔為你收集整理的Linux C++ 实现线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。