C 语言

关注公众号 jb51net

关闭
首页 > 软件编程 > C 语言 > C++线程池

使用C++实现一个高效的线程池

作者:过过过呀Glik

在多线程编程中,线程池是一种常见且高效的设计模式,本文将详细介绍如何使用C++实现一个线程池,并解析相关代码实现细节,需要的小伙伴可以参考下

在多线程编程中,线程池是一种常见且高效的设计模式。它通过预先创建一定数量的线程来处理任务,从而避免频繁创建和销毁线程带来的性能开销。本文将详细介绍如何使用C++实现一个线程池,并解析相关代码实现细节。

线程池简介

线程池(Thread Pool)是一种管理和复用线程的机制。它通过维护一个线程集合,当有任务需要执行时,从线程池中分配一个空闲线程来处理任务,任务完成后线程归还到池中。这样可以显著减少线程创建和销毁的开销,提高系统的整体性能和响应速度。

设计思路

本文实现的线程池主要包含两个核心类:

线程池支持两种模式:

MODE_CACHED:缓存模式,根据任务量动态调整线程数量,适用于任务量不固定的场景。

MODE_FIXED:固定模式,线程数量固定,适用于任务量稳定的场景。

Thread类实现

Thread类负责封装单个线程的创建和管理。以下是Thread.h和Thread.cpp的实现。

Thread.h

#include <functional>
#include <atomic>
#include <cstdint>
#include <thread>

class Thread {
public:
    using ThreadFunc = std::function<void(std::uint32_t)>;

public:
    explicit Thread(ThreadFunc func);

    void join();

    ~Thread();

    void start();

    [[nodiscard]] std::uint32_t getID() const;

    [[nodiscard]] static std::uint32_t getNumCreated();

    Thread(const Thread &) = delete;

    Thread &operator=(const Thread &) = delete;

private:
    ThreadFunc m_func;
    uint32_t m_threadID;
    std::thread m_thread;
    static std::atomic<uint32_t> m_numCreateThread;
};

Thread.cpp

#include "Thread.h"

std::atomic<uint32_t> Thread::m_numCreateThread(0);

Thread::Thread(Thread::ThreadFunc func) : m_func(std::move(func)), m_threadID(m_numCreateThread.load()) {
    m_numCreateThread++;
}

void Thread::start() {
    m_thread = std::thread([this]() {
        m_func(m_threadID);
    });
    m_thread.detach();
}

uint32_t Thread::getID() const {
    return m_threadID;
}

uint32_t Thread::getNumCreated() {
    return Thread::m_numCreateThread.load();
}

Thread::~Thread() {
    join();
}

void Thread::join() {
    if (m_thread.joinable()) {
        m_thread.join();
    }
}

解析

成员变量:

构造函数:

接受一个函数作为参数,并分配一个唯一的线程ID。

start方法:

启动线程,执行传入的函数,并将线程设为分离状态,以便在线程结束时自动回收资源。

join方法和析构函数:

如果线程可连接,则执行join操作,确保线程资源的正确回收。

ThreadPool类实现

ThreadPool类负责管理多个线程,维护任务队列,并调度任务给线程执行。以下是ThreadPool.h和ThreadPool.cpp的实现。

ThreadPool.h

#include <mutex>
#include <unordered_map>
#include <memory>
#include <functional>
#include <queue>
#include <iostream>
#include <condition_variable>
#include <future>
#include <cstdint>
#include "Thread.h"

enum class THREAD_MODE {
    MODE_CACHED,
    MODE_FIXED,
};

class ThreadPool {
public:
    explicit ThreadPool(THREAD_MODE mode = THREAD_MODE::MODE_CACHED, std::uint32_t maxThreadSize = 1024,
                        std::uint32_t initThreadSize = 4, std::uint32_t maxTaskSize = 1024);

    ~ThreadPool();

    void setThreadMaxSize(uint32_t maxSize);
    void setMode(THREAD_MODE mode);
    void setTaskMaxSize(uint32_t maxSize);
    void start(uint32_t taskSize = std::thread::hardware_concurrency());

    ThreadPool(const ThreadPool &) = delete;
    ThreadPool &operator=(const ThreadPool &) = delete;

    template<typename Func, typename ...Args>
    auto submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type>;

protected:
    [[nodiscard]] bool checkState() const;
    void ThreadFun(uint32_t threadID);

private:
    using Task = std::function<void()>;

    std::unordered_map<uint32_t, std::unique_ptr<Thread>> m_threads;
    uint32_t m_initThreadSize; // 初始线程数量
    std::atomic<std::uint32_t> m_spareThreadSize; // 空闲线程数量
    uint32_t m_maxThreadSize; // 最大线程数量
    std::atomic<bool> m_isRunning; // 线程池运行标志
    THREAD_MODE m_mode; // 线程池运行模式

    std::deque<Task> m_tasks;
    std::atomic<uint32_t> m_taskSize;
    uint32_t m_maxTaskSize;

    uint32_t m_thread_maxSpareTime;

    mutable std::mutex m_mutex; // 线程池互斥量
    std::condition_variable m_notEmpty;
    std::condition_variable m_notFull;
    std::condition_variable m_isExit;
};

ThreadPool.cpp

#include "ThreadPool.hpp"
#include <thread>

ThreadPool::ThreadPool(THREAD_MODE mode, uint32_t maxThreadSize, uint32_t initThreadSize,
                       uint32_t maxTaskSize) : m_initThreadSize(initThreadSize), m_spareThreadSize(0),
                                               m_maxThreadSize(maxThreadSize), m_isRunning(false), 
                                               m_mode(mode), m_taskSize(0), m_maxTaskSize(maxTaskSize), 
                                               m_thread_maxSpareTime(60) {
}

bool ThreadPool::checkState() const {
    return m_isRunning;
}

void ThreadPool::setThreadMaxSize(uint32_t maxSize) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_maxThreadSize = maxSize;
}

void ThreadPool::setMode(THREAD_MODE mode) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_mode = mode;
}

void ThreadPool::setTaskMaxSize(uint32_t maxSize) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_maxTaskSize = maxSize;
}

void ThreadPool::ThreadFun(uint32_t threadID) {
    auto last_time = std::chrono::high_resolution_clock::now();
    for (;;) {
        Task task;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            std::cout << "threadID: " << threadID << " trying to get a task" << std::endl;
            while (m_tasks.empty() && m_isRunning) {
                if (m_mode == THREAD_MODE::MODE_CACHED && m_threads.size() > m_initThreadSize) {
                    if (m_notEmpty.wait_for(lock, std::chrono::seconds(3)) == std::cv_status::timeout) {
                        auto now_time = std::chrono::high_resolution_clock::now();
                        auto dur_time = std::chrono::duration_cast<std::chrono::seconds>(now_time - last_time);
                        if (dur_time.count() > m_thread_maxSpareTime && m_threads.size() > m_initThreadSize) {
                            m_threads.erase(threadID);
                            m_spareThreadSize--;
                            std::cout << "threadID: " << threadID << " exiting due to inactivity!" << std::endl;
                            return;
                        }
                    }
                } else {
                    m_notEmpty.wait(lock);
                }
            }
            if (!m_isRunning && m_tasks.empty()) {
                m_threads.erase(threadID);
                std::cout << "threadID: " << threadID << " exiting!" << std::endl;
                m_isExit.notify_all();
                return;
            }

            if (!m_tasks.empty()) {
                m_spareThreadSize--;
                task = std::move(m_tasks.front());
                m_tasks.pop_front();
                std::cout << "threadID: " << threadID << " retrieved a task!" << std::endl;
                if (!m_tasks.empty())
                    m_notEmpty.notify_all();
                m_notFull.notify_all();
            }
        }
        if (task) {
            try {
                task();
            } catch (const std::exception &e) {
                std::cerr << "Exception in task: " << e.what() << std::endl;
            } catch (...) {
                std::cerr << "Unknown exception in task." << std::endl;
            }
            std::cout << "threadID: " << threadID << " completed a task." << std::endl;
            m_spareThreadSize++;
            last_time = std::chrono::high_resolution_clock::now();
        }
    }
}

void ThreadPool::start(std::uint32_t taskSize) {
    m_isRunning = true;
    for (std::uint32_t i = 0; i < taskSize; ++i) {
        auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
        auto threadID = ptr->getID();
        m_threads.emplace(threadID, std::move(ptr));
    }
    for (auto &it: m_threads) {
        it.second->start();
        m_spareThreadSize++;
    }
}

ThreadPool::~ThreadPool() {
    m_isRunning = false;
    std::unique_lock<std::mutex> lock(m_mutex);
    m_notEmpty.notify_all();
    m_notFull.notify_all();
    m_isExit.wait(lock, [&]() -> bool { return m_threads.empty(); });
}

submitTask模板方法实现

template<typename Func, typename ...Args>
auto ThreadPool::submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type> {
    using Rtype = typename std::invoke_result<Func, Args...>::type;
    auto task = std::make_shared<std::packaged_task<Rtype()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    std::future<Rtype> result = task->get_future();
    std::unique_lock lock(m_mutex);
    if (!m_notFull.wait_for(lock, std::chrono::seconds(3),
                            [&]() -> bool { return m_tasks.size() < m_maxTaskSize; })) {
        std::cerr << "Task queue is full, submit task failed!" << std::endl;
        throw std::runtime_error("Task queue is full");
    }
    m_tasks.emplace_back([task] { (*task)(); });
    m_notEmpty.notify_all();

    if (m_mode == THREAD_MODE::MODE_CACHED && m_tasks.size() > m_spareThreadSize) {
        if (m_threads.size() >= m_maxThreadSize) {
            std::cerr << "Thread pool has reached max size, cannot create new thread!" << std::endl;
        } else {
            std::cout << "Creating a new thread!" << std::endl;
            auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
            u_int64_t threadID = ptr->getID();
            m_threads.emplace(threadID, std::move(ptr));
            m_threads[threadID]->start();
            ++m_spareThreadSize;
        }
    }
    return result;
}

解析

成员变量:

构造函数:

初始化线程池的各项参数,如模式、最大线程数、初始线程数、最大任务数等。

start方法:

启动线程池,创建初始数量的线程,并将其启动。

submitTask模板方法:

ThreadFun方法:

析构函数:

关闭线程池,通知所有线程退出,并等待所有线程结束。

线程池的使用

以下是一个简单的示例,展示如何使用上述实现的线程池。

#include "ThreadPool.h"
#include <iostream>
#include <chrono>

// 示例任务函数
void exampleTask(int n) {
    std::cout << "Task " << n << " is starting." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Task " << n << " is completed." << std::endl;
}

int main() {
    // 创建线程池,使用缓存模式,最大线程数为8,初始线程数为4,最大任务数为16
    ThreadPool pool(THREAD_MODE::MODE_CACHED, 8, 4, 16);
    pool.start();

    // 提交多个任务
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 10; ++i) {
        futures.emplace_back(pool.submitTask(exampleTask, i));
    }

    // 等待所有任务完成
    for (auto &fut : futures) {
        fut.get();
    }

    std::cout << "All tasks have been completed." << std::endl;
    return 0;
}

运行结果

threadID: 0 trying to get a task
threadID: 1 trying to get a task
threadID: 2 trying to get a task
threadID: 3 trying to get a task
Task 0 is starting.
Task 1 is starting.
Task 2 is starting.
Task 3 is starting.
threadID: 0 completed a task.
threadID: 0 trying to get a task
Task 4 is starting.
threadID: 1 completed a task.
threadID: 1 trying to get a task
Task 5 is starting.
...
All tasks have been completed.

以上就是使用C++实现一个高效的线程池的详细内容,更多关于C++线程池的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文