C 语言

关注公众号 jb51net

关闭
首页 > 软件编程 > C 语言 > C++生产者与消费者模式

C++实现生产者与消费者模式方式

作者:star-keke

多线程工作池示例:创建固定数量的工作线程,通过条件变量竞争任务队列,确保任务均匀分发,任务队列读写操作由互斥锁保护,避免竞争,使用`notify_one()`唤醒空闲线程,`notify_all()`停止时唤醒所有线程退出,适用于CPU/IO密集型任务

多线程工作池

线程安全保障

任务分发逻辑

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <vector>

// 任务队列类型
using Task = std::function<void()>;

int main() {
    std::mutex mtx;
    std::condition_variable cv;
    std::queue<Task> taskQueue;
    bool stop = false; // 退出标志
    const size_t workerCount = 3; // 工作线程数量
    std::vector<std::thread> workers; // 工作线程列表

    // ========== 工作线程循环:多线程消费任务 ==========
    auto workerLoop = [&](int workerId) {
        while (true) {
            Task task;

            // 加锁,获取任务或检测退出信号
            {
                std::unique_lock<std::mutex> lock(mtx);
                
                // 等待条件:有任务 或 需要停止
                cv.wait(lock, [&]() {
                    return !taskQueue.empty() || stop;
                });

                // 若停止且任务队列为空,退出循环
                if (stop && taskQueue.empty()) {
                    std::cout << "[线程" << workerId << "] 退出工作循环..." << std::endl;
                    break;
                }

                // 取出队列头部任务(多线程竞争,确保线程安全)
                task = std::move(taskQueue.front());
                taskQueue.pop();
                std::cout << "[线程" << workerId << "] 取出任务,准备执行..." << std::endl;
            } // 解锁,避免执行任务时持有锁

            // 执行任务
            if (task) {
                task();
            }
        }
    };

    // ========== 创建多个工作线程 ==========
    for (int i = 0; i < workerCount; ++i) {
        workers.emplace_back(workerLoop, i);
    }

    // ========== 模拟提交任务(生产者逻辑) ==========
    auto submitTask = [&](Task task) {
        std::lock_guard<std::mutex> lock(mtx);
        taskQueue.push(std::move(task));
        std::cout << "提交任务,当前队列大小:" << taskQueue.size() << std::endl;
        cv.notify_one(); // 唤醒一个等待的工作线程
    };

    // 批量提交10个任务
    for (int i = 0; i < 10; ++i) {
        submitTask([i]() {
            std::cout << "执行任务" << i << ":线程ID=" << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务耗时
        });
    }

    // 等待所有任务执行(可选,也可通过队列状态判断)
    std::this_thread::sleep_for(std::chrono::seconds(3));

    // ========== 停止所有工作线程 ==========
    {
        std::lock_guard<std::mutex> lock(mtx);
        stop = true;
        cv.notify_all(); // 唤醒所有等待的线程,确保全部退出
        std::cout << "\n通知所有线程停止..." << std::endl;
    }

    // 等待所有工作线程结束
    for (auto& worker : workers) {
        if (worker.joinable()) {
            worker.join();
        }
    }

    std::cout << "程序结束" << std::endl;
    return 0;
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文