基于C++实现线程池加速
生活随笔
收集整理的這篇文章主要介紹了
基于C++实现线程池加速
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
經過長期探索,發現一個不需要手動設置線程休眠時間(e.g. std::this_thread::sleep_for(std::chrono::microseconds(1)))的代碼:
Github: https://github.com/log4cplus/ThreadPool
#ifndef THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
#define THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <functional>
#include <stdexcept>
#include <algorithm>
#include <cassert>namespace progschj {class ThreadPool {public:explicit ThreadPool(std::size_t threads= (std::max)(2u, std::thread::hardware_concurrency()));template<class F, class... Args>auto enqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;void wait_until_empty();void wait_until_nothing_in_flight();void set_queue_size_limit(std::size_t limit);void set_pool_size(std::size_t limit);~ThreadPool();private:void start_worker(std::size_t worker_number,std::unique_lock<std::mutex> const &lock);// need to keep track of threads so we can join themstd::vector< std::thread > workers;// target pool sizestd::size_t pool_size;// the task queuestd::queue< std::function<void()> > tasks;// queue length limitstd::size_t max_queue_size = 100000;// stop signalbool stop = false;// synchronizationstd::mutex queue_mutex;std::condition_variable condition_producers;std::condition_variable condition_consumers;std::mutex in_flight_mutex;std::condition_variable in_flight_condition;std::atomic<std::size_t> in_flight;struct handle_in_flight_decrement{ThreadPool & tp;handle_in_flight_decrement(ThreadPool & tp_): tp(tp_){ }~handle_in_flight_decrement(){std::size_t prev= std::atomic_fetch_sub_explicit(&tp.in_flight,std::size_t(1),std::memory_order_acq_rel);if (prev == 1){std::unique_lock<std::mutex> guard(tp.in_flight_mutex);tp.in_flight_condition.notify_all();}}};};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(std::size_t threads): pool_size(threads), in_flight(0){std::unique_lock<std::mutex> lock(this->queue_mutex);for (std::size_t i = 0; i != threads; ++i)start_worker(i, lock);}// add new work item to the pool// 有兩種方法可以實現調用類成員,// 一種是使用 bind: .enqueue(std::bind(&Dog::sayHello, &dog));// 一種是用 mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)template<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();std::unique_lock<std::mutex> lock(queue_mutex);if (tasks.size() >= max_queue_size)// wait for the queue to empty or be stoppedcondition_producers.wait(lock,[this]{return tasks.size() < max_queue_size|| stop;});// don't allow enqueueing after stopping the poolif (stop)//若線程池已經開始析構,這是不允許加入新事件throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });std::atomic_fetch_add_explicit(&in_flight,std::size_t(1),std::memory_order_relaxed);condition_consumers.notify_one();return res;}// the destructor joins all threadsinline ThreadPool::~ThreadPool(){std::unique_lock<std::mutex> lock(queue_mutex);stop = true;pool_size = 0;condition_consumers.notify_all();condition_producers.notify_all();condition_consumers.wait(lock, [this] { return this->workers.empty(); });assert(in_flight == 0);}inline void ThreadPool::wait_until_empty(){std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_producers.wait(lock,[this] { return this->tasks.empty(); });}inline void ThreadPool::wait_until_nothing_in_flight(){std::unique_lock<std::mutex> lock(this->in_flight_mutex);this->in_flight_condition.wait(lock,[this] { return this->in_flight == 0; });}inline void ThreadPool::set_queue_size_limit(std::size_t limit){std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_limit = max_queue_size;max_queue_size = (std::max)(limit, std::size_t(1));if (old_limit < max_queue_size)condition_producers.notify_all();}inline void ThreadPool::set_pool_size(std::size_t limit){if (limit < 1)limit = 1;std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_size = pool_size;assert(this->workers.size() >= old_size);pool_size = limit;if (pool_size > old_size){// create new worker threads// it is possible that some of these are still running because// they have not stopped yet after a pool size reduction, such// workers will just keep runningfor (std::size_t i = old_size; i != pool_size; ++i)start_worker(i, lock);}else if (pool_size < old_size)// notify all worker threads to start downsizingthis->condition_consumers.notify_all();}inline void ThreadPool::start_worker(std::size_t worker_number, std::unique_lock<std::mutex> const &lock){assert(lock.owns_lock() && lock.mutex() == &this->queue_mutex);assert(worker_number <= this->workers.size());auto worker_func =[this, worker_number]{for (;;){std::function<void()> task;bool notify;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_consumers.wait(lock,[this, worker_number] {return this->stop || !this->tasks.empty()|| pool_size < worker_number + 1; });// deal with downsizing of thread pool or shutdownif ((this->stop && this->tasks.empty())|| (!this->stop && pool_size < worker_number + 1)){// detach this worker, effectively marking it stoppedthis->workers[worker_number].detach();// downsize the workers vector as much as possiblewhile (this->workers.size() > pool_size&& !this->workers.back().joinable())this->workers.pop_back();// if this is was last worker, notify the destructorif (this->workers.empty())this->condition_consumers.notify_all();return;}else if (!this->tasks.empty()){task = std::move(this->tasks.front());this->tasks.pop();notify = this->tasks.size() + 1 == max_queue_size|| this->tasks.empty();}elsecontinue;}handle_in_flight_decrement guard(*this);if (notify){std::unique_lock<std::mutex> lock(this->queue_mutex);condition_producers.notify_all();}task();}};if (worker_number < this->workers.size()) {std::thread & worker = this->workers[worker_number];// start only if not already runningif (!worker.joinable()) {worker = std::thread(worker_func);}}elsethis->workers.push_back(std::thread(worker_func));}} // namespace progschj#endif // THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
Demo
#include <iostream>
#include <vector>
#include <chrono>#include "ThreadPool.h"using namespace progschj;int main()
{ThreadPool pool;std::vector< std::future<int> > results;for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i*i;}));}pool.wait_until_empty();pool.wait_until_nothing_in_flight ();for(auto && result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}
這兩句代碼是可以保證所有線程都收回時再進行下一步。
pool.wait_until_empty();
pool.wait_until_nothing_in_flight ();
上述代碼是引用于下面這個代碼:
Github: https://github.com/progschj/ThreadPool
有人對這個代碼做簡單的解釋,可以參考:
http://www.nodekey.com/threadpool-e6-b3-a8-e8-a7-a3/
使用線程池加速的時候可以用Lambda表達,也可以這么寫:
mpool.enqueue(Function_name, 函數參數1, 參數2, 參數3);
但是如果Function是成員函數,那么就會出現下面這個惡心的問題:
non-standard syntax; use '&' to create a pointer to member
具體解釋可以參考:
https://www.cnblogs.com/blog-vincent-0x1F7/p/9668533.html
https://linustechtips.com/topic/772287-unable-to-use-stdthread-part-2/
解決辦法是:
有兩種方法可以實現調用類成員,一種是使用 bind: .enqueue(std::bind(&Dog::sayHello, &dog));一種是用 mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)
總結
以上是生活随笔為你收集整理的基于C++实现线程池加速的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Qt: 实现浮点slider和浮点spi
- 下一篇: Qt创建多线程的步骤