c++11 threadpool
c++11 threadpool的实现,这里参考 github(4.7k stars), 在很多中小项目中都可以看到这个版本的实现,这里记录一下关键点.
实现:
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif
使用方法:
int main(){
// create thread pool with 4 worker threads
ThreadPool pool(4);
// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);
// get result from future
std::cout << result.get() << std::endl;
}
42
这里主要分析一些关键点.
- 整体上使用了基于
condition_variable
同步的生产消费模型,外部的生产者通过enque提交task (可调用对象和参数)到队列中,消费者线程取出task,并执行。这里condition_variable
的使用方法是其最常见的pattern(八股). - 类型萃取,即使用
std::result_of
获得返回类型:using return_type = typename std::result_of<F(Args...)>::type;
. - 使用
std::queue
存储std::function
.由于容器中存储对象必须为同一类型,即std::function
的模板参数必须相同。而实际使用时需要能够存储任意函数,这里需要将执行的函数统一转换为void()
类型。所以有两点需要解决,第一点是输入参数的绑定,第二点是运行结果的获取,这里使用了两个技术来完成该需求: - 对于第一点 ,使用
std::bind
,即绑定函数与函数的输入参数,这里将输入参数全部与函数进行绑定,即std::bind(std::forward<F>(f), std::forward<Args>(args)...)
, 这样std::bind
返回是一个签名为return_type()
的callable type,它的输入参数为空。 - 第二点通过
std::packaged_task
构建std::future
,即上面的变量res
,并返回给外部的调用者。外部的调用者即可通过res.get()
获得函数的运行结果。
这里一个注意点是std::function
无法用来包装std::packaged_task
,因为std::function
的参数必须要是copy constructible的(cppreference), 而std::packaged_task
的copy constructor被delete,只有move constructor,所以产生了矛盾。为了达成这个要求,一个简单的想法是使用 lambda 来绕过这个限制, 但是实际上即使使用lambda,在传入std::function
时依然会调用std::packaged_task
的copy construtor,导致无法编译通过,如(stackoverflow):
#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <queue>
int main() {
auto f = []() { return 1; };
std::queue<std::function<void()>> q;
std::packaged_task<int()> pack(f);
auto lambda = [p = std::move(pack)] () mutable { p(); };
q.emplace(std::move(lambda));
return 0;
}
error : use of deleted function ‘std::packaged_task<_Res(_ArgTypes ...)>::packaged_task(const std::packaged_task<_Res(_ArgTypes ...)>&) [with _Res = int; _ArgTypes = {}]’
所以这里的实现中,给出的方法是在heap上创建std::packaged_task
,构造智能指针,即auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
之后再使用lambda capture 指针类型的task, 即tasks.emplace([task](){ (*task)(); });
即保证了传入std::function
时只会复制指针,从而避免了copy constructor的调用.
所以将上面无法编译的代码改写如下:
#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <queue>
#include <utility>
#include <iostream>
int main() {
auto f = []() { return 1; };
std::queue<std::function<void()>> q;
auto task = std::make_shared<std::packaged_task<int()>> (f);
std::future<int> res = task->get_future();
q.emplace([task](){(*task)();});
q.front()();
q.pop();
std::cout << "result of task : " << res.get() << std::endl;
return 0;
}
result of task : 1
此时可以编译通过,并得到正确结果.(注意,这里链接pthread
时的 gcc 命令使用-pthread
,不要使用-lpthread
,否则会产生触发core dump, 参考stackoverflow)
- 通过使用
{}
控制生命周期来控制析构的时间点,从而更好地利用RAII.如这里:
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
这样保证了mutex在执行task()
前就被unlock
,避免了无意义的锁占用。