C++线程池实现 主要思想 让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。
这个循环该什么时候停止呢?
很简单,当线程池停止使用时 ,循环停止。
这样一来,就保证了thread函数的唯一性,而且复用线程执行task。
总结一下,我们的线程池的主要组成部分有二:
任务队列(Task Queue)
线程池(BMP)
线程池与任务队列之间的匹配操作,是典型的生产者-消费者 模型,本模型使用了两个工具:一个mutex
+ 一个conditional_variable
。mutex
就是锁,保证任务的添加和移除(获取)的互斥性;一个条件变量保证多个线程获取task的同步性:当任务队列为空时,线程应该等待(阻塞)。
BMP.h
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <queue> #include <mutex> #include <functional> #include <thread> #include <condition_variable> #include <vector> #include <future> template <typename T>class safeQueue { private : std::queue<T> _m_queue; std::mutex _m_lock; public : bool empty () { std::lock_guard<std::mutex> en_lock (_m_lock) ; return _m_queue.empty (); } void push (T value) { std::lock_guard<std::mutex> en_lock (_m_lock) ; _m_queue.push (value); } bool pop (T &t) { std::lock_guard<std::mutex> en_lock (_m_lock) ; if (_m_queue.empty ()) { return false ; } t = std::move (_m_queue.front ()); _m_queue.pop (); return true ; } int size () { std::lock_guard<std::mutex> en_lock (_m_lock) ; return _m_queue.size (); } }; class BMP { private : class worker { private : uint32_t _worker_id; BMP *_m_pool; public : worker (const uint32_t id, BMP *pool) : _worker_id(id), _m_pool(pool) {} void operator () () { std::function<void ()> func; bool dequeued; while (!_m_pool->_m_shutdown) { { std::unique_lock<std::mutex> lock (_m_pool->_m_conditional_mutex) ; if (_m_pool->_m_queue.empty ()) { _m_pool->_m_conditional_lock.wait (lock); } dequeued = _m_pool->_m_queue.pop (func); } if (dequeued) { func (); } } } }; bool _m_shutdown; safeQueue<std::function<void ()>> _m_queue; std::vector<std::thread> _m_threads; std::mutex _m_conditional_mutex; std::condition_variable _m_conditional_lock; public : BMP (const int n_threads = 2 ) : _m_threads(std::vector <std::thread>(n_threads)), _m_shutdown(false ) {} BMP (const BMP &) = delete ; BMP (BMP &&) = delete ; BMP &operator =(const BMP &) = delete ; BMP &operator =(BMP &&) = delete ; void init () { for (uint32_t i = 0 ; i < _m_threads.size (); i++) { _m_threads.at (i) = std::thread (worker (i, this )); } } void shutdown () { _m_shutdown = true ; _m_conditional_lock.notify_all (); for (uint32_t i = 0 ; i < _m_threads.size (); i++) { if (_m_threads[i].joinable ()) { _m_threads[i].join (); } } } template <typename F, typename ... Args> auto submit (F &&f, Args &&...args) -> std::future<decltype (f(args...)) > { std::function<decltype (f(args...))()> func = std::bind (std::forward<F>(f), std::forward<Args>(args)...); auto task_ptr = std::make_shared<std::packaged_task<decltype (f (args...))()>>(func); std::function<void ()> wrapper_func = [task_ptr]() { (*task_ptr)(); }; _m_queue.push (wrapper_func); _m_conditional_lock.notify_one (); return task_ptr->get_future (); } }; #endif
test.cpp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 #include <iostream> #include <random> #include "BMP.h" std::random_device rd; std::mt19937 mt (rd()) ; std::uniform_int_distribution<int > dist (-1000 , 1000 ) ; auto rnd = std::bind (dist, mt);void simulate_hard_computation () { std::this_thread::sleep_for (std::chrono::milliseconds (2000 + rnd ())); } void multiply (const int a, const int b) { simulate_hard_computation (); const int res = a * b; std::cout << a << " * " << b << " = " << res << std::endl; } void multiply_output (int &out, const int a, const int b) { simulate_hard_computation (); out = a * b; std::cout << a << " * " << b << " = " << out << std::endl; } int multiply_return (const int a, const int b) { simulate_hard_computation (); const int res = a * b; std::cout << a << " * " << b << " = " << res << std::endl; return res; } void example () { BMP pool (3 ) ; pool.init (); for (int i = 1 ; i <= 3 ; ++i) for (int j = 1 ; j <= 10 ; ++j) { pool.submit (multiply, i, j); } int output_ref; auto future1 = pool.submit (multiply_output, std::ref (output_ref), 5 , 6 ); future1.get (); std::cout << "Last operation result is equals to " << output_ref << std::endl; auto future2 = pool.submit (multiply_return, 5 , 3 ); int res = future2.get (); std::cout << "Last operation result is equals to " << res << std::endl; pool.shutdown (); } int main () { example (); return 0 ; }
执行结果
参考
基于C++11实现线程池 - 知乎 (zhihu.com)