Linux

关注公众号 jb51net

关闭
首页 > 网站技巧 > 服务器 > Linux > Linux之生产者消费者模型

Linux之生产者消费者模型用法解读

作者:郭二哈

本文详细解析生产者消费者模型,通过超市模型解释其优化原因,并探讨其线程安全问题,并介绍阻塞队列实现细节及POSIX信号量在其中的应用,强调全面覆盖模型原理与实现细节

一、生产者消费者模型

举个例子:


我们现实生活中:工厂,超市,人[消费者]之间的关系就是一个典型的生产者消费者模型

这个模型的基本工作流程是:

现实生活中,为什么会优化出这样的一种模型?

为什么消费者不直接去工厂买东西?

主要有以下3个原因:

为了效率

1.对于工厂来说,消费者如果直接来买商品,一个消费者一次购买的商品数量非常有限,而且一个工厂的产品种类一般并不多,所以消费者还得去不同工厂买东西,工厂为了防止卖不完,只能减少商品的生产,即降低了工厂的生产效率,但是如果卖给超市,就可以一次性卖一大车货物,多找几个超市,就可以做到:生产多少卖多少了

2.对于消费者,工厂的占地面积比较大,所以一般都距离居民地比较远,而超市一般都比工厂占地面积小的多,离消费者很近,所以消费者去超市买东西比去工厂更快,而且一个工厂的产品种类一般并不多,但是超市售卖的产品种类繁多

有了超市就可以做到生产者和消费者之间的解藕

1.一个工厂关门了,超市换一个工厂进货就行了,只要进的货还是一样的,就丝毫不影响消费者消费。事实就是如此,我们消费者从超市买东西时,根本就不知道这个商品是哪个工厂生产的,我们也不关心消费者只管购买商品就行了

2.到超市消费者是谁,是什么群体,工厂也不知道,也不关心,这个是超市该关心的,因为工厂只把商品卖给超市,超市卖给谁和工厂无关,工厂只管生产就行了

3.有了超市的存在

支持忙闲不均

有了超市这个缓冲区的存在

1.在购物潮之前,超市就可以通知工厂多生产一些商品,超市多进货,方便应对更多消费者

2.在囤积的商品多的时候,超市也可以搞活动吸引消费者,并且通知工厂生产地慢一点

超市本质上就是工厂和消费者之间的缓存

即:超市支持工厂“预加载”产品,消费者可以一定程度“预定”产品

上述例子对应到计算机中的生产者消费者模型就是:

和线程安全再对应一下:

一共有3种关系:

1.生产者和生产者之间:互斥

2.消费者和消费者之间:互斥

3.生产者和消费者之间:互斥+同步

二、生产者消费者模型的阻塞队列版本

生产者消费者模型一般会使用一个阻塞队列来作为共享资源,进而实现多线程协作

生产者消费者模型的阻塞队列的特点:

阻塞队列类的简单实现

成员变量

1.存储数据的容器

直接使用STL的queue,因为数据的类型不确定,所以阻塞队列类是模板类

2.一把锁

阻塞队列自己会被所有线程看见,所以它是共享资源,所以需要锁来保护自己

要几把锁呢?

因为所有生产者线程之间,所有消费者线程之间,以及生产者和消费者之间

都是互斥的,所以它们得用同一把锁来实现互斥

3.生产者线程的条件变量

因为在满足一定条件(比如:阻塞队列满了,或者阻塞队列满了4/5了等)时,可以让所有生产者线程暂时暂停生产(即去条件变量的等待队列中阻塞)

4.消费者线程的条件变量

因为在满足一定条件(比如:阻塞队列为空,或者阻塞队列空了4/5了等)时,可以让所有消费者线程暂时暂停消费(即去条件变量的等待队列中阻塞)

为什么要搞两个条件变量?

一个条件变量虽然也可以实现生产者线程和消费者线程之间的同步

但是实现起来非常麻烦,而且不能区分条件变量的等待队列下的是生产者线程还是消费者线程

而且

两个条件变量可以很好地支持:
 

生产者消费者模型的第3个优点:忙闲不均

5.int _cap:阻塞队列的最大容量

6.int _csleep_num:在消费者条件变量的等待队列中等待的线程个数

7.int _psleep_num:在生产者条件变量的等待队列中等待的线程个数

6和7成员变量的存在主要是为了方便实现线程之间的互相唤醒机制

(即生产者线程生产了之后,可以唤醒消费者线程来消费,反之同理)

成员函数

    void Equeue(const T& in)
    {
        pthread_mutex_lock(&_mutex);

        //生产者调用
        while(IsFull())
        {
            _psleep_num++;

            cout << "生产者, 进入休眠了:" << _psleep_num << endl;
            pthread_cond_wait(&_full_cond, &_mutex);
            _psleep_num--;
        }

        //100% 队列有空间
        _q.push(in);

        if(_csleep_num > 0)
        {
            pthread_cond_signal(&_empty_cond);
            cout << "唤醒消费者..." << endl;
        }

        pthread_mutex_unlock(&_mutex);
    }

代码细节:

伪唤醒问题的解决

例如:

此时:

1.如果此时是使用if进行“线程是否需要进入条件变量的等待队列"的判断的这个被伪唤醒的线程,重新申请并拿到锁之后,就直接"饿虎出笼"去肆意妄为了

2.如果是使用while进行“线程是否需要进入条件变量的等待队列”的判断的,这个被唤醒的线程,重新申请并拿到锁之后,也还是不能直接出循环,因为要再判断一下循环条件是否不满足了

虽然循环条件是"线程需要进入等待队列"的条件,但是如果这个条件满足,不就意味着线程不应该被唤醒吗?

  T Pop()
    {
        //消费者调用
        pthread_mutex_lock(&_mutex);

        while(IsEmpty())
        {
            _csleep_num++;

            pthread_cond_wait(&_empty_cond, &_mutex);
            _csleep_num--;
        }

        T data = _q.front();
        _q.pop();

        if(_psleep_num > 0)
        {
            pthread_cond_signal(&_full_cond);
            cout << "唤醒生产者.." << endl;
        }

        pthread_mutex_unlock(&_mutex);
        return data;
    }
 bool IsFull()
{
    return _q.size() >= _cap;
}

bool IsEmpty()
{
    return _q.empty();
}

源码

#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>

using namespace std;

int defalutcap = 5;

template<typename T>
class BlockQueue
{
private:

    bool IsFull()
    {
        return _q.size() >= _cap;
    }

    bool IsEmpty()
    {
        return _q.empty();
    }

public:

    BlockQueue(int cap = defalutcap)
        : _cap(cap),
          _csleep_num(0),
          _psleep_num(0)
        {
            pthread_mutex_init(&_mutex, nullptr);
            pthread_cond_init(&_full_cond, nullptr);
            pthread_cond_init(&_empty_cond, nullptr);
        }

    void Equeue(const T& in)
    {
        pthread_mutex_lock(&_mutex);

        //生产者调用
        while(IsFull())
        {
            _psleep_num++;

            cout << "生产者, 进入休眠了:" << _psleep_num << endl;
            pthread_cond_wait(&_full_cond, &_mutex);
            _psleep_num--;
        }

        //100% 队列有空间
        _q.push(in);

        if(_csleep_num > 0)
        {
            pthread_cond_signal(&_empty_cond);
            cout << "唤醒消费者..." << endl;
        }

        pthread_mutex_unlock(&_mutex);
    }

    T Pop()
    {
        //消费者调用
        pthread_mutex_lock(&_mutex);

        while(IsEmpty())
        {
            _csleep_num++;

            pthread_cond_wait(&_empty_cond, &_mutex);
            _csleep_num--;
        }

        T data = _q.front();
        _q.pop();

        if(_psleep_num > 0)
        {
            pthread_cond_signal(&_full_cond);
            cout << "唤醒生产者.." << endl;
        }

        pthread_mutex_unlock(&_mutex);
        return data;
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full_cond);
        pthread_cond_destroy(&_empty_cond);
    }

private:
    //临界资源
    queue<T> _q;
    //大小
    int _cap;

    pthread_mutex_t _mutex;

    pthread_cond_t _full_cond;
    pthread_cond_t _empty_cond;

    int _csleep_num;//消费者休眠的个数
    int _psleep_num;//生产者休眠的个数
};

三、POSIX信号量

POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。

初始化信号量

sem_init

作用: 用于初始化一个未命名的POSIX信号量(也称为匿名信号量),通常用于线程间同步或共享内存的进程间同步。

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

返回值

销毁信号量

sem_destroy

int sem_destroy(sem_t *sem);

等待信号量

sem_wait

是 POSIX 信号量的 P操作(等待/获取信号量),用于对信号量进行原子减1操作。

它的主要作用是:

int sem_wait(sem_t *sem); //P()

发布信号量

sem_post

是 POSIX 信号量的 V操作(释放/增加信号量),用于对信号量进行原子加1操作。它的主要作用是:

int sem_post(sem_t *sem);//V()

四、基于环形队列的⽣产消费模型

上⼀节⽣产者-消费者的例⼦是基于queue的,其空间可以动态分配,现在基于固定⼤⼩的环形队列重写这个程序(POSIX信号量):

Sem的封装

#include <iostream>
#include <semaphore.h>
#include <pthread.h>

namespace SemMoudle
{
    const int defaultvalue = 1;
    
    class Sem
    {
    public:

        Sem(unsigned int sem_vlaue = defaultvalue)
        {
            sem_init(&_sem, 0, sem_vlaue);
        }

        void P()
        {
            //等待信号量,会将信号量的值减1
            int n = sem_wait(&_sem);//原子的
            (void)n;
        }

        void V()
        {
            //发布信号量,释放资源,会将信号量的值加1
            int n = sem_post(&_sem);//原子的
            (void)n;
        }

        ~Sem()
        {
            sem_destroy(&_sem);
        }

    private:
        sem_t _sem;
    };
}

Mutex的封装

#pragma once
#include <iostream>
#include <pthread.h>

namespace MutexModue
{
    class Mutex
    {
    public:
        Mutex()
        {
            pthread_mutex_init(&_mutex, nullptr);
        }

        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
            (void)n;
        }

        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
            (void)n;
        }

        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

        pthread_mutex_t* Get()
        {
            return &_mutex;
        }

    private:
        pthread_mutex_t _mutex;
    };

    class LockGuard
    {
    public:

        LockGuard(Mutex& mutex):_mutex(mutex)
        {
            _mutex.Lock();
        }

        ~LockGuard()
        {
            _mutex.Unlock();
        }

    private:
        Mutex& _mutex;
    };
}

环形队列

#pragma once

#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"

using namespace std;

static const int  gcap = 5;

using namespace SemMoudle;
using namespace MutexModue;

//环形队列
template<typename T>
class RingQueue
{
public:

    RingQueue(int cap = gcap)
        :_cap(cap),
         _rq(cap),
         _blank_sem(cap),
         _p_step(0),
         _data_sem(0),
         _c_step(0)
        {}
    
    void Equeue(const T& in)
    {
        //生产者
        //1.申请信号量,空位置信号量
        _blank_sem.P();
        {
            LockGuard lockguard(_pmutex);
            //2.生产
            _rq[_p_step] = in;
            //3.更新下标
            ++_p_step;
            //4.维持环形特性
            _p_step %= _cap;
        }

        _data_sem.V();
    }

    void Pop(T* out)
    {
         //消费者
        //1.申请信号量,数据信号量
        _data_sem.P();
        {
            LockGuard lockguard(_cmutex);
            //2.消费
            *out = _rq[_c_step];
            //3.更新下标
            ++_c_step;
            //4.维持环形特性
            _c_step %= _cap;
        }

        _blank_sem.V();
    }

private:
    vector<T> _rq;
    int _cap;

    //生产者
    Sem _blank_sem;//空位置
    int _p_step;

    //消费者
    Sem _data_sem;//数据
    int _c_step;

    //维护多生产,多消费,2把锁
    Mutex _cmutex;
    Mutex _pmutex;
};

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文