This is my study note of thread pools.
线程池核心就是线程池中的线程会持续查询任务队列是否有可用工作,如果有可用工作则将其取出并执行。
任务队列
任务队列需要实现的就是插入、删除任务等操作。
- 判断队列是否为空 SafeQueue::empty()
1 2 3 4 5
| bool empty() { std::unique_lock<std::mutex> lock(m_mutex); return m_queue.empty(); }
|
- 返回队列长度 SafeQueue::size()
1 2 3 4 5
| int size() { std::unique_lock<std::mutex> lock(m_mutex); return m_queue.size(); }
|
- 插入任务 SafeQueue::enqueue(T &t)
1 2 3 4 5 6
| void enqueue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); m_queue.emplace(t); }
|
- 取出任务 SafeQueue::dequeue(T &t)
1 2 3 4 5 6 7 8 9 10
| bool dequeue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); if (m_queue.empty()) return false; t = std::move(m_queue.front()); m_queue.pop(); return true; }
|
线程池代码
使用内置class(ThreadWorker)执行真正的操作
- 内置class构造函数
1 2 3 4 5
| ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id) { }
|
- 重载()操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| void operator()() { std::function<void()> func; bool dequeued; while (!m_pool->m_shutdown) { { std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex); if (m_pool->m_queue.empty()) { m_pool->m_conditional_lock.wait(lock); } dequeued = m_pool->m_queue.dequeue(func); } if (dequeued) func(); } }
|
线程池初始化 init()
1 2 3 4 5 6 7 8 9
| void init() { for (int i = 0; i < m_threads.size(); ++i) { m_threads.at(i) = std::thread(ThreadWorker(this, i)); } }
|
线程池关闭 shutdown()
1 2 3 4 5 6 7 8 9 10 11 12 13
| void shutdown() { m_shutdown = true; m_conditional_lock.notify_all(); for (int i = 0; i < m_threads.size(); ++i) { if (m_threads.at(i).joinable()) { m_threads.at(i).join(); } } }
|
线程池任务提交 submit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| template <typename F, typename... Args> auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> { std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func); std::function<void()> warpper_func = [task_ptr]() { (*task_ptr)(); }; m_queue.enqueue(warpper_func); m_conditional_lock.notify_one(); return task_ptr->get_future(); }
|
总体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
| class SafeQueue { private: std::queue<T> m_queue; std::mutex m_mutex; public: SafeQueue() {} SafeQueue(SafeQueue &&other) {} ~SafeQueue() {} bool empty() { std::unique_lock<std::mutex> lock(m_mutex); return m_queue.empty(); } int size() { std::unique_lock<std::mutex> lock(m_mutex); return m_queue.size(); } void enqueue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); m_queue.emplace(t); } bool dequeue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); if (m_queue.empty()) return false; t = std::move(m_queue.front()); m_queue.pop(); return true; } }; class ThreadPool { private: class ThreadWorker { private: int m_id; ThreadPool *m_pool; public: ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id) { } void operator()() { std::function<void()> func; bool dequeued; while (!m_pool->m_shutdown) { { std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex); if (m_pool->m_queue.empty()) { m_pool->m_conditional_lock.wait(lock); } dequeued = m_pool->m_queue.dequeue(func); } if (dequeued) func(); } } }; bool m_shutdown; SafeQueue<std::function<void()>> m_queue; std::vector<std::thread> m_threads; std::mutex m_conditional_mutex; std::condition_variable m_conditional_lock; public: ThreadPool(const int n_threads = 4) : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) { } ThreadPool(const ThreadPool &) = delete; ThreadPool(ThreadPool &&) = delete; ThreadPool &operator=(const ThreadPool &) = delete; ThreadPool &operator=(ThreadPool &&) = delete; void init() { for (int i = 0; i < m_threads.size(); ++i) { m_threads.at(i) = std::thread(ThreadWorker(this, i)); } } void shutdown() { m_shutdown = true; m_conditional_lock.notify_all(); for (int i = 0; i < m_threads.size(); ++i) { if (m_threads.at(i).joinable()) { m_threads.at(i).join(); } } } template <typename F, typename... Args> auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> { std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func); std::function<void()> warpper_func = [task_ptr]() { (*task_ptr)(); }; m_queue.enqueue(warpper_func); m_conditional_lock.notify_one(); return task_ptr->get_future(); } };
|