用C++11 实现 thread pool
最近看了看我的計劃,寫道這里也算是到了一半,大部分都是講的單一的C++11的用法,基本都是理論知識,就像我上大學的時候,老師一直講理論知識,結局就是能去能不去的時候,我選擇了后者。所以在這里穿插一下小的綜合運用文章,讓大家知道為什么要用C++11,C++11好在哪里,項目中如何運用C++11.
首先介紹一下背景。在我們的工作中,避免不了多線程之間的配合。在現在處理器動輒8核16核的背景下,如果我們的程序還停留在單線程的模型,那么我們就沒法享受多處理器帶來的性能提成。之前看過我司代碼中的threadpool。寫的那叫一個滴水不漏,每個小細節都有大量的代碼去實現。不但非常冗長,而且以我的智商基本上讀不懂。唯一的有點就是:真穩定。不過threadpool的模型已經很難融入現代C++了。
所以有必要通過C++11來重新實現一下threadpool,對比一下modern
C++和C98.
1. 為什么要有threadpool?
如果談論threadpool,你會想到有什么功能呢?
傳統的模型大概是這樣的,把一個函數指針傳給threadpool。然后thread們會在合適的時候調用這個函數。那么還有一個問題就是函數的返回值怎么傳遞回調用的線程。這個功能往往有很多種方法,我司的思路就是調用你的callback將函數返回值返回給你。當然不是返回給調用函數的線程。
以上的描述中反映的threadpool的兩個最基本的需求:
?可以把一個可執行的對象扔給threadpool去執行。
可以把執行的返回值帶回。
其實這就是threadpool存在的合理性-- 把工作扔給它,我只要知道結果就行。當然任務扔給threadpool后,你就可以去干一些別的工作。
有人會說,扔給threadpool,無非是讓別的線程去干活,干的總活并沒有減少。相反,一些threadpool的開銷反而讓工作變的更慢。至于這個問題我想用redis來舉例子。
眾所周知,redis最新版本支持的多線程。redis的作者在解釋為什么引入多線程的時候說過。在他們維護redis的時候,發現redis的瓶頸竟然出現在分配內存上(從socket上拷貝內存)。所以你會發現redis起了多線,只是為了加速內存拷貝,最終的邏輯還是在一個線程執行的。所以可以看出,可以把較慢的代碼或者可以流水操作的代碼讓不同的線程執行。
?
2.?現代化threadpool提出了什么更高的要求?
之前我們分享過std::function。std::function 是C++11提供的可執行代碼的包裝器,它可以是一個普通函數,或者是函數指針,或者是lambda...,所以對于我們來說,threadpool也要支持std::function能支持的類型。
關于返回值,還有如何返回到calling thread,之前我們也分享過std::future.
還有就是線程間的同步,之前我們分享過 std::condition_variable。
還有就是thread的包裝器std::packaged_task.
至此我們湊齊了實現threadpool的幾大件,下面我們看看如何來實現它
3. 原理:
3.1 對象定義
要實現一個threadpool。我們要有以下的信息:
我們要有個結構體,記住我們控制的thread。
我們要有個結構體,記住我們要做的事情。
我們要有個condition_variable來做線程間同步。
為了優雅的推出,我們要有個標志位,標志著我現在想推出了,大家都退下吧。
功能上要有:
構造函數
析構函數
最重要的 -- 添加任務的函數
3.2?初始化
這里構建了我們需要的thread。并把它放在一個vector里。
這個thread只干一件事,那就是等condition_variable的通知,如果有通知,那么從task queue里邊拿出一個task,并執行該task。
當然還有一些判斷是否退出的邏輯。
inline ThreadPool::ThreadPool(size_t threads): stop(false) {for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}}); }4. 添加任務API
說到函數,不可避免的就是函數的實現和函數的參數,為了實現支持不同的類型,我們選擇了用模板來適配。
同時為了得到返回值,我們將返回值設置成了future。那么問題來了,這該如何實現?是不是想起了packaged_task??如果忘了,回憶一下吧。
packaged_task可以將可執行的工作打包,然后獲取它的future。
至此我們就可以實現我們的功能了。思路就是來了一個可執行的工作,首先封裝成packaged_task。然后把這個task放到task queue中。并且通知一個線程說queue里邊有東西了,趕緊去干活。
在返回之前,得到它的future并返回。
實現如下:
template <class F, class... Args> auto ThreadPool::enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res; }至此,所有功能都實現了,有了C++11,是不是一切都變得美好了起來,用了60行就實現了以前無數行才能實現的功能,而且簡單易懂,支持現代化的C++調用。
5.?完整代碼
class ThreadPool {public:ThreadPool(size_t);template <class F, class... Args>auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop; }; template <class F, class... Args> auto ThreadPool::enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res; } inline ThreadPool::ThreadPool(size_t threads): stop(false) {for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}}); } inline ThreadPool::~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join(); } #include <iostream> #include <vector> #include <chrono> #include <map> #include <thread> #include <string> #include <tuple> #include "MThreadPool.hpp" #include <log4cplus/logger.h> //#include <log4cplus/consoleappender.h> #include <log4cplus/fileappender.h> #include <log4cplus/layout.h> //#include <log4cplus/ndc.h> //#include <log4cplus/mdc.h> #include <log4cplus/helpers/loglog.h> #include <log4cplus/thread/threads.h> //#include <log4cplus/helpers/sleep.h> #include <log4cplus/loggingmacros.h>using namespace std; using namespace log4cplus; using namespace log4cplus::helpers; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("Max:"));using namespace std; typedef tuple<string, int> M_TUPLE; typedef std::function<M_TUPLE()> M_FUNCTION; vector<std::function<tuple<string, int>()>> M_VECTOR; typedef std::lock_guard<std::recursive_mutex> M_GUARD; typedef std::unique_lock<std::recursive_mutex> M_UNIQUE; std::recursive_mutex func_mutex;enum RET_CODE {M_SUCCESS = 0,M_FAIL,M_MAX };M_TUPLE M_put() {std::string M_func(__func__);// fucntion bodyLOG4CPLUS_DEBUG(logger, "hello!");std::this_thread::sleep_for(std::chrono::microseconds(100)); //seconds(1));LOG4CPLUS_DEBUG(logger, "world!");return std::make_tuple(M_func, M_SUCCESS); } void M_LOG() { }int main() {log4cplus::initialize();try{SharedObjectPtr<Appender> append_1(new FileAppender("Test.log"));append_1->setName(LOG4CPLUS_TEXT("First"));log4cplus::tstring pattern = LOG4CPLUS_TEXT("[%d{%m/%d/%y %H:%M:%S,%Q}] %c %-5p - %m [%l]%n");// std::tstring pattern = LOG4CPLUS_TEXT("%d{%c} [%t] %-5p [%.15c{3}] %%%x%% - %m [%l]%n");append_1->setLayout(std::auto_ptr<Layout>(new PatternLayout(pattern)));Logger::getRoot().addAppender(append_1);logger.setLogLevel(DEBUG_LOG_LEVEL);}catch (...){Logger::getRoot().log(FATAL_LOG_LEVEL, LOG4CPLUS_TEXT("Exception occured..."));}LOG4CPLUS_DEBUG(logger, "set logger done!"<< "\nhello log4cplus\n");int thread_num = std::thread::hardware_concurrency();if (!thread_num){thread_num = 1;}M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);std::cout << " start " << thread_num << "threads" << std::endl;ThreadPool pool(thread_num);std::vector<std::future<M_TUPLE>> results;M_FUNCTION tmp;while (!M_VECTOR.empty()){{M_GUARD lock1(func_mutex);tmp = M_VECTOR.back();}results.emplace_back(pool.enqueue([=] {return tmp();}));{M_GUARD lock1(func_mutex);M_VECTOR.pop_back();}}for (auto &&result : results){std::string tmp_str;int tmp_bool;tie(tmp_str, tmp_bool) = result.get();cout << "string is " << tmp_str << "bool is " << tmp_bool << endl;}std::cout << std::flush;return 0; }?
總結
以上是生活随笔為你收集整理的用C++11 实现 thread pool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 标准化条件变量 -- condition
- 下一篇: 智能指针shared_ptr