C 语言

关注公众号 jb51net

关闭
首页 > 软件编程 > C 语言 > 轻量级C++线程池实现

从零实现一个轻量级C++线程池

作者:UrSpecial

本文旨在构建一个轻量级的、可扩展的C++线池,文章详细阐述了线程池的工作原理、核心组成部分及其设计思想,通过具体代码展示了任务队列、同步机制、任务接口等模块的设计与实现,需要的朋友可以参考下

一、引言

本文目标:从零实现一个轻量、可用、可扩展的 C++ 线程池。

关键技术点:

  1. std::thread
  2. std::mutex / std::unique_lock
  3. std::condition_variable
  4. std::function / future / packaged_task / bind

二、什么是线程池

线程池是一种用于管理和复用线程的并发编程模型。它的核心思想是预先创建一组工作线程,并将它们放入一个“池”中进行管理。当有新任务需要处理时,不再创建和销毁线程,而是直接将任务提交给线程池,由池中空闲的线程来执行。

三、为什么需要线程池

频繁地创建和销毁线程会带来显著的系统开销,包括内存分配、切换到内核态等。线程池通过复用线程,有效解决了这个问题,并带来了以下优势:

四、线程池的核心组成

  1. 工作线程集合 (Worker Threads):池中预先创建好的一组线程,它们会持续运行,不断从任务队列中获取并执行任务。
  2. 任务队列 (Task Queue):一个线程安全的队列,用于存放所有待执行的任务。它作为任务提交者和工作线程之间的缓冲区。
  3. 同步机制 (Synchronization)
    • 互斥锁 (Mutex):用于保护任务队列,确保在多线程环境下对队列的访问是安全的,防止竞态条件。
    • 条件变量 (Condition Variable):用于工作线程的等待和唤醒。当任务队列为空时,工作线程会进入等待状态;当有新任务加入时,会通知(唤醒)一个或所有等待的线程。
  4. 任务接口 (Task Interface):一个用于提交任务的方法,允许外部将各种类型的任务(函数、Lambda表达式等)提交到线程池中。

五、C++线程池的实现

因为在代码中使用到了一些异步编程技术,所以先做个简单的介绍。

std::condition_variable——条件变量,它是一种线程间的同步机制,当没有任务时,它会阻塞工作线程。生产者线程将任务加入队列后,会通过同一个条件变量唤醒在该条件变量下等待的线程。这么做的好处是,避免了工作线程循环检测队列中有没有任务带来的CPU开销。代码中用到的接口主要有3个:

void wait (unique_lock<mutex>& lck, Predicate pred);

第一个参数是互斥锁,该函数内部会将锁释放,避免线程休眠时持有锁,导致其他活跃线程拿不到锁。

第二个参数是一个可调用对象,这个可调用对象必须能够返回true或false。而且,这个可调用对象会循环的执行,直到它的返回结果是true。

pred可调用对象返回true后,线程被唤醒,重新获取锁,向下执行。

void notify_one() noexcept;

唤醒一个在该条件变量下等待的线程。

void notify_all() noexcept;

唤醒所有在该条件变量下等待的线程。

std::future,用来获取异步执行的结果。如果没有std::future,在C++中想要获取其他线程的返回值的话,我们需要将这个返回值写入全局变量,这样其他的线程才可以看到。全局变量是共享资源,多线程场景下是需要加锁保护的,std::future封装了这些底层的细节,提供了一种同步获取结果的方式。

get()方法

会阻塞调用线程,直到拿到结果才继续往下执行,所以说它是一种同步获取结果的方式。

std::function,是一种函数包装器,统一函数的类型。为什么这么说?函数指针、lambda表达式虽然都是可调用对象,但是它们的类型是完全不同的,所以就不能够放到同一个容器中统一管理。但是,经过function的包装后,它们就有了统一的类型。下面在实现线程池中就可以看到如何使用。

std::packaged_task,是一种任务包装器,用来获取异步执行任务的结果的。它内部有一个关联的future对象,函数的返回值会被写入到这个future对象中,同时它还提供了一个获取这个future对象的接口get_future(),其他线程拿到这个关联的future后,就可以调用get()方法获取异步执行的结果。顺便说一下,packaged_task内部重载了(),可以直接通过packaged_task对象执行它包装的任务。

std::bind,绑定函数参数,并返回一个可调用对象。比如,函数Add(int a, int b)原本是需要传入两个参数的,是以这种形式 Add(10, 20) 调用的。但是,经过std::bind绑定后,例如 auto func = std::bind(Add, 10, 20); 在调用的时候,就不用传参了,直接这样 func() 调用,效果是一样的。为什么要绑定参数,在下面的线程池实现中,就很明白了。

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <future>
#include <mutex>
#include <functional>
#include <chrono>
#include <type_traits>
#include <condition_variable>
class ThreadPool {
public:
    ThreadPool(size_t thread_num = 4):
        _thread_num(thread_num),
        _start(false),
        _stop(false)
    {}
    ~ThreadPool(){
        if (_start && !_stop) stop();
    }
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool(ThreadPool&&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;
    void start() {
        std::unique_lock<std::mutex> lock(_mutex);
        if (_start) return;
        _workers.reserve(_thread_num);
        for (size_t i = 0; i < _thread_num; i++) {
            _workers.emplace_back(std::thread([this](){
                work_loop();
            }));
        }
        _start = true;
    }
    void stop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            if (!_start || _stop) return;
            // 在join回收线程之前,必须先将_stop置为true,
            // 否则工作线程可能会一直阻塞在条件变量上,导致无法正常退出,甚至会导致程序崩溃
            _stop = true;
        }
        _cond.notify_all();
        for (auto& worker : _workers) {
            if (worker.joinable()) worker.join();
        }
    }
    template<class F, class... Args>
    auto submit(F&& f, Args&&... args)->std::future<std::invoke_result_t<F, Args...>> {
        using return_type = std::invoke_result_t<F, Args...>;
        // 绑定函数参数,并交给任务包装器
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        // 获取关联的future
        std::future<return_type> res = task->get_future();
        // 加锁+入队列
        {
            std::unique_lock<std::mutex> lock(_mutex);
            if (_stop || !_start) throw std::runtime_error("线程池未启动!");
            _tasks.emplace([task](){(*task)();});
        }
        _cond.notify_one();
        return res;
    }
private:
    void work_loop() {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _cond.wait(lock, [this](){
                    return _stop || !_tasks.empty();
                });
                if (_stop && _tasks.empty()) return;
                task = std::move(_tasks.front());
                _tasks.pop();
            }
            task();
        }
    }
private:
    std::vector<std::thread> _workers;
    std::queue<std::function<void()>> _tasks;
    std::mutex _mutex;
    std::condition_variable _cond;
    size_t _thread_num;
    bool _start;
    bool _stop;
};
int add(int a, int b) {
    return a + b;
}
void print() {
    std::cout << "-------------------print-------------------" << std::endl;
    std::cout << "Hello World!" << std::endl;
}
int main() {
    ThreadPool pool(4);
    pool.start();
    std::cout << "==================ThreadPoolTest==================" << std::endl;
    pool.submit([](){
        std::cout << "-------------------lambda-------------------" << std::endl;
        std::cout << "this is a lambda!" << std::endl;
    });
    std::this_thread::sleep_for(std::chrono::seconds(3));  
    auto ret1 = pool.submit(add, 10, 20);
    std::cout << "-------------------add-------------------" << std::endl;
    std::cout << "10 + 20 = " << ret1.get() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(3));  
    pool.submit(print);
    pool.stop();
    return 0;
}

运行结果:

在上述代码实现中,用到了C++17的语法。

这里我解释一下队列中的任务参数为空,并且返回值为void,但是带返回值的add函数为什么可以插入队列中。

通过std::bind绑定函数的所有参数,所有就做到了“无参”。

通过std::packaged_task获取函数的返回值,所以不担心返回值拿不到。

通过lambda封装一层,不管原本是否有参数,是否有返回值,加入队列中的任务都是满足无参和无返回值的。

这三个操作组合在一起,不管函数有无返回值,都能适配到任务队列中。

另外,在绑定参数时,不仅对函数参数进行了完美转发,还对函数本身进行了完美转发。对参数进行完美转发是因为要保持它本身的左右值属性。对函数进行完美转发是因为,我们在使用时,可能会直接在submit函数传入lambda,这时候lambda它是一个右值,采用完美转发可以保持它的右值属性,触发移动语义,也就避免了std::bind内部对它进行拷贝。

以上就是从零实现一个轻量级C++线程池的详细内容,更多关于轻量级C++线程池实现的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文