c++11 threadpool

c++11 threadpool的实现,这里参考 github(4.7k stars), 在很多中小项目中都可以看到这个版本的实现,这里记录一下关键点.

实现:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

使用方法:

int main(){
    // create thread pool with 4 worker threads

    ThreadPool pool(4);

    // enqueue and store future
    auto result = pool.enqueue([](int answer) { return answer; }, 42);

    // get result from future
    std::cout << result.get() << std::endl;
}
42

这里主要分析一些关键点.

  • 整体上使用了基于condition_variable同步的生产消费模型,外部的生产者通过enque提交task (可调用对象和参数)到队列中,消费者线程取出task,并执行。这里condition_variable的使用方法是其最常见的pattern(八股).
  • 类型萃取,即使用std::result_of获得返回类型:using return_type = typename std::result_of<F(Args...)>::type;.
  • 使用std::queue存储std::function.由于容器中存储对象必须为同一类型,即std::function的模板参数必须相同。而实际使用时需要能够存储任意函数,这里需要将执行的函数统一转换为void()类型。所以有两点需要解决,第一点是输入参数的绑定,第二点是运行结果的获取,这里使用了两个技术来完成该需求:
    • 对于第一点 ,使用std::bind,即绑定函数与函数的输入参数,这里将输入参数全部与函数进行绑定,即std::bind(std::forward<F>(f), std::forward<Args>(args)...), 这样std::bind返回是一个签名为return_type()的callable type,它的输入参数为空。
    • 第二点通过std::packaged_task构建std::future,即上面的变量res,并返回给外部的调用者。外部的调用者即可通过res.get()获得函数的运行结果。

这里一个注意点是std::function无法用来包装std::packaged_task,因为std::function的参数必须要是copy constructible的(cppreference), 而std::packaged_task的copy constructor被delete,只有move constructor,所以产生了矛盾。为了达成这个要求,一个简单的想法是使用 lambda 来绕过这个限制, 但是实际上即使使用lambda,在传入std::function时依然会调用std::packaged_task的copy construtor,导致无法编译通过,如(stackoverflow):

#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <queue>

int main() {
    auto f = []() { return 1; }; 
    std::queue<std::function<void()>> q;
    std::packaged_task<int()> pack(f);
    auto lambda = [p = std::move(pack)] () mutable { p(); };
    q.emplace(std::move(lambda));

    return 0;
}
error : use of deleted function ‘std::packaged_task<_Res(_ArgTypes ...)>::packaged_task(const std::packaged_task<_Res(_ArgTypes ...)>&) [with _Res = int; _ArgTypes = {}]’

所以这里的实现中,给出的方法是在heap上创建std::packaged_task,构造智能指针,即auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); 之后再使用lambda capture 指针类型的task, 即tasks.emplace([task](){ (*task)(); });即保证了传入std::function时只会复制指针,从而避免了copy constructor的调用.

所以将上面无法编译的代码改写如下:

#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <queue>
#include <utility>
#include <iostream>

int main() {
    auto f = []() { return 1; }; 
    std::queue<std::function<void()>> q;
    auto task = std::make_shared<std::packaged_task<int()>> (f);
    std::future<int> res = task->get_future();
    q.emplace([task](){(*task)();});

    q.front()();
    q.pop();
    std::cout << "result of task : " << res.get() << std::endl;
    return 0;
}
result of task : 1

此时可以编译通过,并得到正确结果.(注意,这里链接pthread时的 gcc 命令使用-pthread,不要使用-lpthread,否则会产生触发core dump, 参考stackoverflow)

  • 通过使用{}控制生命周期来控制析构的时间点,从而更好地利用RAII.如这里:
{
    std::unique_lock<std::mutex> lock(this->queue_mutex);
    this->condition.wait(lock,
        [this]{ return this->stop || !this->tasks.empty(); });
    if(this->stop && this->tasks.empty())
        return;
    task = std::move(this->tasks.front());
    this->tasks.pop();
}

task();

这样保证了mutex在执行task()前就被unlock,避免了无意义的锁占用。

编辑于 2021-09-13 21:00