Linux

关注公众号 jb51net

关闭
首页 > 网站技巧 > 服务器 > Linux > Linux环形队列的生产消费者模型

Linux基于环形队列的生产消费者模型详解

作者:s_little_monster_

这篇文章主要介绍了Linux基于环形队列的生产消费者模型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、POSIX信号量

1、概述

在我们进行环形队列的生产消费者模型的学习之前,我们要对前置条件POSIX信号量进行学习,这里的POSIX的信号量与systemV的信号量是几乎一致的,都是用于同步操作,达到无冲突的访问共享资源的目的,只是POSIX信号量的使用要更简单一些,可以用于线程间同步

信号量的本质就是一个计数器,它的本质就是用来描述资源数目的,把资源是否就绪放到了临界区之外,在申请信号量的时候其实已经就是间接在做判断了

2、调用接口

(一)初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

(二)销毁信号量

#include <semaphore.h>
int sem_destroy(sem_t *sem);

(三)等待信号量

#include <semaphore.h>
int sem_wait(sem_t *sem);

sem_wait 函数执行的是信号量的 P 操作

(四)发布信号量

#include <semaphore.h>
int sem_post(sem_t *sem);

sem_post 函数执行的是信号量的 V 操作,会将信号量 sem 的值加 1

3、在环形队列中的作用

我们在之前应该都接触过环形队列,在环形队列中,一般我们是需要一个计数器的,或者在环形队列中留出最后一个位置,因为如果没有这些措施,我们就不知道双指针谁在前谁在后了,我们这里使用信号量替代了这个计数器

二、基于环形队列的生产消费者模型

1、理论探究

我们通过数组以及模运算的方式来模拟环状模型,前面的基于阻塞队列的生产消费者模型底层来说是基于容器queue的,其空间可以动态分配,现在是基于固定大小的,基于容器vector

其中生产者关注的是环形队列的空间资源,消费者关心的是环形队列的数据资源,而环形队列中的空间资源+数据资源=全部资源,只要有空间生产者就可以生产数据然后放入,只要有数据消费者就可以取出数据然后加工

2、代码实现

(一)RingQueue.hpp

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
//环形队列默认容量
const static int defaultcap = 8;
//环形队列核心接口:PV操作以及加锁解锁
template<class T>
class RingQueue{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
	//初始化
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);
		//生产者消费者的锁
        pthread_mutex_init(&c_mutex_, nullptr);
        pthread_mutex_init(&p_mutex_, nullptr);
    }
    void Push(const T &in) // 生产活动
    {
    	//调用P函数检查队列中是否有可用空间,没有可用空间线程会阻塞
        P(pspace_sem_);
		//这里为什么要先P后加锁,下面详谈
        Lock(p_mutex_); 
        ringqueue_[p_step_] = in;
        // 位置后移,维持环形特性
        p_step_++;
        p_step_ %= cap_;
        Unlock(p_mutex_); 

        V(cdata_sem_);

    }
    void Pop(T *out)       // 消费活动
    {
        P(cdata_sem_);

        Lock(c_mutex_); 
        *out = ringqueue_[c_step_];
        // 位置后移,维持环形特性
        c_step_++;
        c_step_ %= cap_;
        Unlock(c_mutex_); 

        V(pspace_sem_);
    }
    //析构销毁
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }
private:
    std::vector<T> ringqueue_;// 环形队列的底层实现
    int cap_;		   		  // 队列容量

    int c_step_;       		  // 消费者下标
    int p_step_;      		  // 生产者下标

    sem_t cdata_sem_; 		  // 队中可用数据资源
    sem_t pspace_sem_;		  // 队中可用空间资源

    pthread_mutex_t c_mutex_; // 消费者锁
    pthread_mutex_t p_mutex_; // 生产者锁
};

(二)Task.hpp

任务函数还是上一次的任务

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {}
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {}

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

(三)main.cpp

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;
//这个结构体是方便我们打印的时候查看方便的
struct ThreadData
{
    RingQueue<Task> *rq;   //环形队列
    std::string threadname;//线程名字
};

void *Productor(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        // 模拟获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 生产数据
        rq->Push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;

        sleep(1);
    }
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while (true)
    {
        // 消费数据
        Task t;
        rq->Pop(&t);
       
        // 处理数据
        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq = new RingQueue<Task>(10);

    pthread_t c[5], p[3];
    
	//这里我们为了方便查看,统一用单生产单消费
    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p + i, nullptr, Productor, td);
    }
    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c + i, nullptr, Consumer, td);
    }

    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }

    return 0;
}

3、PV操作包裹住加解锁操作的原因

PopPush 函数中,以Push 函数为例,P(pspace_sem_)V(cdata_sem_) 包裹着 Lock(p_mutex_)Unlock(p_mutex_) 这种设计是为了实现更细粒度的同步控制,尽可能减少锁的竞争,以确保线程安全和高效性,下面详细解释其原因:

P(pspace_sem_)Lock(p_mutex_) 之前:

V(cdata_sem_)Unlock(p_mutex_) 之后:

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文