C++线程安全容器stack和queue的使用详细介绍
作者:TTOR
要构建线程安全的数据结构, 关注几点:
- 若某线程破坏了数据结构的不变量, 保证其他线程不能看到
- 提供的操作应该完整,独立, 而非零散的分解步骤避免函数接口固有的条件竞争(比如之前提到的empty和top和pop)
线程安全的容器栈threadsafe_stack
入门(3)里曾介绍过线程安全的stack容器, 这里把代码搬过来再分析
逐项分析, 该代码如何实现线程安全的
template<typename T> class threadsafe_stack { private: stack<T> data; mutable mutex m; public: threadsafe_stack(){} threadsafe_stack(const threadsafe_stack &other) { lock_guard lock1(other.m); data=other.data; } threadsafe_stack &operator=(const threadsafe_stack &) = delete; void push(T new_value) { lock_guard lock1(m); data.push(move(new_value)); //1 } shared_ptr<T> pop() { lock_guard lock1(m); if (data.empty()) { throw empty_stack(); //2 } shared_ptr<T> const res(make_shared<T>(move(data.top()))); //3 data.pop(); //4 return res; } void pop(T &value) { lock_guard lock1(m); if (data.empty()) { throw empty_stack(); } value = move(data.top()); //5 data.pop(); //6 } bool empty() const //7 { lock_guard lock1(m); return data.empty(); } };
首先, 每个操作都对互斥加锁, 保证基本线程安全
其次, 在多线程下, 对于std::stack容器, empty(), top(), pop()存在接口上的数据竞争(见入门(3)说明), 于是threadsafe_stack把这些调用集合到一个函数pop()里, 以实现线程安全. 其中pop()函数里若与遇栈空, 直接抛出异常
接着分析:
1处data.push()可能抛出异常: 原因是复制/移动时抛出异常或stack容器扩展容量时遇上内存分配不足, 但无论哪种, std::stack<>能保证自身的安全
2处抛出的异常: 没有改动数据, 安全的抛出行为
3处共享指针的创建可能抛出异常: 内存不足或移动/复制相关的构造函数抛出异常,但两种情形c++都能保证不会出现内存泄漏, 并且此时数据还未改动(data.pop()时才改动),
4处data.pop()的实质操作是返回结果, 绝不会抛出异常,结合3, 所以这是异常安全的重载函数pop()
5,6处和3,4处类似, 不同之处是没用创建新共享指针, 但此时数据也没被改动, 也是安全的重载函数pop()
最后7处empty()不改动任何数据, 是异常安全的函数
从内存和数据结构安全方面来说没用问题
然而,这段代码可能造成死锁:
因为在持锁期间, 有可能执行以下用户自定义函数:
用户自定义的复制构造函数(1 3处的res构造), 移动构造函数(3处的make_share), 拷贝赋值操作和移动赋值操作(5处), 用户也可能重载了new和delete.
当在这些函数里, 若是再次调用了同个栈的相关函数, 会再次申请获取锁, 然而之前的锁还没释放, 因此造成死锁
以下是我想到的一种死锁方式(正常情况应该不会这么写, 但是设计时必须要考虑)
class A; threadsafe_stack<A> s; class A { public: A(A&& a)//2->然后这里使用s.pop(),之前锁没释放, 造成了死锁 { s.pop(); } A(){} }; int main() { s.push(A()); //1->临时对象A()在s.push()里被move进内置data时, 会调用A的移动构造函数 return 0; }
向栈添加/移除数据, 不可能不涉及复制行为或内存行为, 于是只能对栈的使用者提出要求: 让使用者来保证避免死锁
栈的各成员函数都有lock_guard保护数据, 因此同时调用的线程没有数量限制.
仅有构造函数和析构函数不是安全行为, 但无论是没构造完成还是销毁到一半, 从而转去调用成员函数, 这在有无并发情况下都是不正确的.
所以, 使用者必须保证: 栈容器未构造完成时不能访问数据, 只有全部线程都停止访问时, 才可销毁容器
线程安全的容器队列threadsafe_queue
自定义一个threadsafe_queue, 并且上面对于线程安全的大多数分析在这也成立
template<typename T> class threadsafe_queue { private: queue<T> data; mutable mutex m; condition_variable condition; public: threadsafe_queue() {} threadsafe_queue(const threadsafe_queue &other) { lock_guard lock1(other.m); data = other.data; } threadsafe_queue &operator=(const threadsafe_queue &) = delete; void push(T new_value) { lock_guard lock1(m); data.push(move(new_value)); condition.notify_one(); //1 } void wait_and_pop(T &value) //2 { lock_guard lock1(m); condition.wait(lock1, [this] { return !data.empty(); }); value = move(data.top()); data.pop(); } shared_ptr<T> wait_and_pop() //3 { lock_guard lock1(m); condition.wait(lock1, [this] { return !data.empty(); }); shared_ptr<T> const res(make_shared<T>(move(data.top()))); //4 创建shared_ptr可能出现异常 data.pop(); return res; } shared_ptr<T> try_pop() { lock_guard lock1(m); if (data.empty()) { return shared_ptr<T>(); //5 } shared_ptr<T> const res(make_shared<T>(move(data.top()))); data.pop(); return res; } bool try_pop(T &value) { lock_guard lock1(m); if (data.empty()) { return false; } value = move(data.top()); data.pop(); } bool empty() const { lock_guard lock1(m); return data.empty(); } };
区别:
发现队列通常用于消费者/生产者模型, 因此实现阻塞的取值函数wait_and_pop, 即当调用时队列若空, 阻塞等待, 直到push数据后调用condition.notify_one()
同时也提供了非阻塞的取值函数try_pop
然而这一实现会有问题:
假如有多个线程同时等待, condition.notify_one()只能唤醒其中一个,若该唤醒的线程执行wait_and_pop之后的代码抛出异常(例如4处res的创建), 此时队列里还有数据,却不会有其他任何线程被唤
如果我们因不能接受这种行为方式, 而只是简单的把notify_one改为notify_all,这样每次push数据后都会唤醒所有的等待线程. 由于只push了1个数据, 大多数线程醒来后发现队列还是为空, 还得继续等待, 这将大大增加开销
第二种解决种方法是若wait_and_pop抛出异常则再次调用notify_one
第三种方法是让std::queue存储share_ptr<T>, share_ptr的初始化移动到push的调用处, 从内部复制shared_ptr<>实例则不会抛出异常
这里采用第三种方法, 还会有额外的好处: push里为shared_ptr分配内存操作在加锁之前, 缩短了互斥加锁的时间, 由于分配内存通常是耗时的操作, 因此这样非常有利于增强性能
template<typename T> class threadsafe_queue { private: queue<shared_ptr<T>> data; mutable mutex m; condition_variable condition; public: threadsafe_queue() {} threadsafe_queue(const threadsafe_queue &other) { lock_guard lock1(other.m); data = other.data; } threadsafe_queue &operator=(const threadsafe_queue &) = delete; void push(T new_value) { //分配内存在加锁操作之前 shared_ptr<T> value(make_shared<T>(move(new_value))); lock_guard lock1(m); data.push(value); condition.notify_one(); } void wait_and_pop(T &value) { lock_guard lock1(m); condition.wait(lock1, [this] { return !data.empty(); //队列空则等待 }); value = move(*data.front()); //先取值, 再存入参数value data.pop(); } bool try_pop(T &value) { lock_guard lock1(m); if (data.empty()) { return false; //队列空返回false } value = move(*data.front()); //先取值, 再存入参数value data.pop(); return true; } shared_ptr<T> wait_and_pop() { lock_guard lock1(m); condition.wait(lock1, [this] { return !data.empty(); //队列空则等待 }); shared_ptr<T> res = data.front(); //取出结果返回给外部 data.pop(); return res; } shared_ptr<T> try_pop() { lock_guard lock1(m); if (data.empty()) { return shared_ptr<T>(); //队列空返回空shared_ptr } shared_ptr<T> res = data.front();//取出结果返回给外部 data.pop(); return res; } bool empty() const { lock_guard lock1(m); return data.empty(); } };
到此这篇关于C++线程安全容器stack和queue的使用详细介绍的文章就介绍到这了,更多相关C++ stack和queue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!