C++ vector在多线程操作中出现内存错误问题及解决
作者:mhrobot
vector在多线程操作中出现内存错误问题
C++ vector的reserve和resize详解
reserve是容器预留空间,但在空间内不真正创建元素对象,所以在没有添加新的对象之前,不能引用容器内的元素。加入新的元素时,要调用push_back()/insert()函数。
resize是改变容器的大小,且在创建对象,因此,调用这个函数之后,就可以引用容器内的对象了,因此当加入新的元素时,用operator[]操作符,或者用迭代器来引用元素对象。此时再调用push_back()函数,是加在这个新的空间后面的。
vector在多线程中操作举例:
有一个全局变量 vector goods_list;
在A线程中从服务器获取最新商品列表,goods_list.push_back()
在B线程中不断的下载商品图片,
Goods &goods = goods_list.at(i)
读取goods .pic_url,下载完成后赋值 goods.local_pic = local_pic
以上简单的逻辑,缺导致程序崩溃,提示内存写入错误。
调试定位到goods.local_pic = local_pic这一句。
估计就是多线程的问题。
查了一下资料,原来vector每次push_back都会重新分配内存,导致goods 这个引用无效,所以goods.local_pic = local_pic赋值写入的时候就会写入到一个无效的地址,导致程序崩溃。
解决办法
加锁也可以解决这个问题,不过那样太低效了,不予考虑。最后的解决方案是,用vector的reserve方法预先分配好内存,免得在使用中动态增长。
在构造函数中提前对goods_list.reserve(30000)分配足够的固定内存,这样就不用每次pushback都申请增加内存、重新分配内存 导致的原内存地址无效,而且效率也高很多。
跨平台使用C++ vector的多线程问题
源起
最近碰到一个linux下程序崩溃的问题,涉及到vector的多线程使用的问题。由于是第二次折腾这个问题,所以把过程记录下来。
简单介绍一下背景:程序为windows和linux跨平台使用。使用一套代码,会分别编译两个平台下的不同版本。
程序涉及的结构。使用了一个全局变量的vector来保存数据,有两个线程,一个线程是周期执行的,每个周期开始时检查全局变量中是否有数据,如果有就取出来处理,然后清空。另一个线程等待外部输入数据,如果有数据就放进vector。
示例代码如下:
#include "stdafx.h"
#include <vector>
typedef void *(TASK_ENTRY_POINT)(void *);
const unsigned short Task_Priority_Base = 50; // 基本优先级
#if defined WIN32 // windows 操作系统
typedef void * SIGNAL_HANDLE;
#else // 标准linux 操作系统
#include <semaphore.h>
typedef sem_t * SIGNAL_HANDLE;
#endif
void SleepUs(unsigned long us)
{
#if defined WIN32 // windows 操作系统
::Sleep(us);
#else // 标准linux 操作系统
if (us>60000) // > 1min
sleep(us/1000000);
else
usleep(us);
#endif
}
void SleepMs(unsigned long Ms)
{
#if defined WIN32 // windows 操作系统
::Sleep(Ms);
#else // 标准linux 操作系统
if (Ms>60000) // > 1min
sleep(Ms/1000);
else
usleep(Ms*1000);
#endif
}
static int running_tasks = 0;
void CreateTask(TASK_ENTRY_POINT EntryFunc,int Priority=0, void *ArgP = NULL);
typedef std::vector<int > vectorType;
vectorType testvec;
bool init_task_library()
{
running_tasks = 0;
int St=0;
// 设置调度策略或基本优先级
#if defined WIN32 // windows 操作系统
SetPriorityClass(GetCurrentProcess(),HIGH_PRIORITY_CLASS);
#else // 标准linux 操作系统
sched_param Param;
int PriorityMax,PriorityMin;
// 禁止内存交换
// St=mlockall(MCL_CURRENT|MCL_FUTURE);
// 设置优先级
PriorityMax = sched_get_priority_max(SCHED_RR);
PriorityMin = sched_get_priority_min(SCHED_RR);
if (Task_Priority_Base>PriorityMax)
Param.__sched_priority = PriorityMax;
else
Param.__sched_priority = Task_Priority_Base;
St=sched_setscheduler(0,SCHED_RR,&Param);
#endif
return true;
}
void CreateTask(TASK_ENTRY_POINT EntryFunc,int Priority, void *ArgP)
{
static bool LibInit=false;
if (!LibInit)
LibInit=init_task_library();
assert(LibInit);
assert(EntryFunc);
#if defined WIN32 // windows 操作系统
HANDLE Tid;
Tid=(HANDLE)::_beginthread((void (*)(void *))EntryFunc, 0, ArgP);
::SetThreadPriority(Tid,Priority);
#else // 标准linux 操作系统
int St;
pthread_t Tid;
sched_param Param;
int Policy;
// 创建线程
St = pthread_create(&Tid,NULL,EntryFunc,ArgP);
assert(St==0);
pthread_detach(Tid);
// 设置线程参数
Policy = SCHED_RR;
Param.__sched_priority=Task_Priority_Base + Priority;
St=pthread_setschedparam(Tid,Policy,&Param);
#endif
++running_tasks;
}
void *addElement(void *ArgP)
{
int i=0;
for(int i=0;i<10000;i++)
{
int res=i;
for(int j=0;j<1000;j++)
{
testvec.push_back(res);
}
printf("addelement %d \n",res);
SleepMs(100);
}
printf("addelement done\n");
return NULL;
}
void *clearElement(void *ArgP)
{
int i=0;
for(int i=0;i<100000000;i++)
{
if(testvec.size()>0)
{
for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); ++ite)//崩点1
{
{
printf("get %d size=%d begin\n",(*ite),testvec.size());//,&*(testvec.begin()));//崩点2
}
}
printf("clear\n");
testvec.clear();//崩点3
}
SleepMs(1);
}
printf("clearelement done\n");
return NULL;
}
int main(int argc, char* argv[])
{
CreateTask(addElement);
CreateTask(clearElement);
printf("done\n");
while(1)
{
SleepMs(10000);
}
return 0;
}reserve问题
真实代码中周期执行的线程大部分时间在sleep,而接收数据的线程也在很少的情况下才会收到数据,因此运行了很长时间也没有出现问题。但是示例代码中,不管是linux还是windows下,却是一跑就崩的。在window下报“vector iterators incompatible” ,在linux下直接segfault。
首先说的是reserve问题。
vector的内存是动态分配的,因此只要vector的大小超过了当前的大小,就会重新开辟一块新的内存,大小为现在大小的两倍,这就导致了如果大小涨了的话,内存会变化的。而如果一个线程里在一直写,另一个线程里用iterator来读,那么如果第一个线程里内存已经变了,读的线程还用原来的地址,就会导致程序崩溃。
这个问题还是比较好解决的。就是首先为vector保留内存大小。使用reserve函数
对代码的改进:
main函数改为:
int main(int argc, char* argv[])
{
char name[1024];
sprintf(name, "testSyncMutex");
testMutex = CreateTrigger(name);
FireTrigger(testMutex);
testvec.reserve(10000000);
printf("hello world\n");
CreateTask(addElement);
CreateTask(clearElement);
printf("done\n");
while(1)
{
SleepMs(10000);
}
return 0;
}clear问题
经过了reserve的修改,生产环境的代码大概率不会崩了,但是小概率事件在大基数面前也会出现。程序还是崩了。再次检视了生产代码,觉得应该加个锁了。
vector并不是线程安全的,所以,虽然生产环境下概率比较小,但是仍然是存在漏洞的。就比如示例代码,仍然会崩。
一个线程里写,另一个线程里读,而且可能崩在任何读的地方(代码注释崩点1~崩点3)。
那么下面就是怎么改了,通过信号量来加锁。增加函数
SIGNAL_HANDLE testMutex;
// 创建信号灯
SIGNAL_HANDLE CreateTrigger(const char* SigName)
{
assert(SigName);
#if defined WIN32 // windows 操作系统
return ::CreateSemaphoreA(NULL,0,1,SigName);
/*return ::CreateEvent( NULL, // no security attributes
FALSE, // auto reset
FALSE, // initially not signaled
SigName); // name of mutex
*/
#else // 标准linux 操作系统
sem_t * SemP;
SemP=new sem_t;
assert(SemP);
int St=sem_init(SemP,0,0); // 线程间共享,初值为0
assert(St != -1);
return SemP;
// return sem_open(SigName,O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH,0);
#endif
}
// 释放信号灯
void FreeTrigger(SIGNAL_HANDLE Handle)
{
assert(Handle);
#if defined WIN32 // windows 操作系统
::CloseHandle(Handle);
#else // 标准linux 操作系统
sem_destroy(Handle);
delete Handle;
//sem_close(Handle);
#endif
}
// 触发信号灯
void FireTrigger(SIGNAL_HANDLE Handle)
{
assert(Handle);
#if defined WIN32 // windows 操作系统
//::SetEvent(Handle);
ReleaseSemaphore (Handle,1,NULL);
#else // 标准linux 操作系统
sem_post(Handle);
#endif
}
// 等待信号灯
void WaitTrigger(SIGNAL_HANDLE Handle)
{
assert(Handle);
#if defined WIN32 // windows 操作系统
::WaitForSingleObject(Handle, INFINITE);
#else // 标准linux 操作系统
sem_wait(Handle);
#endif
}main函数改为:
int main(int argc, char* argv[])
{
char name[1024];
sprintf(name, "testSyncMutex");
testMutex = CreateTrigger(name);
FireTrigger(testMutex);
testvec.reserve(10000000);
CreateTask(addElement);
CreateTask(clearElement);
while(1)
{
SleepMs(10000);
}
return 0;
}addElement函数改为
void *addElement(void *ArgP)
{
int i=0;
int res=0;
for(int i=0;i<10000;i++)
{
for(int j=0;j<1000;j++)
{
WaitTrigger(testMutex);
res=i*1000+j;
testvec.push_back(res);
FireTrigger(testMutex);
}
SleepMs(100);
}
return NULL;
}clearElement中有两种改法,一种是对整个循环加锁。
代码如下:
void *clearElement(void *ArgP)
{
int i=0;
for(int i=0;i<100000000;i++)
{
i++;
if(testvec.size()>0)
{
WaitTrigger(testMutex);
for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); ++ite)
{
printf("get %d size=%d it%x\n",(*ite),testvec.size(),&*ite);
}
testvec.clear();
FireTrigger(testMutex);
}
SleepMs(1);
}
return NULL;
}但是这种做法的副作用也很明显,如果读的线程中执行的操作较多或需要执行的数据条数较多,可能会占用写线程中的执行时间。
另一种改法是加锁的位置更加分散,如下:
void *clearElement(void *ArgP)
{
int i=0;
for(int i=0;i<100000000;i++)
{
if(testvec.size()>0)
{
WaitTrigger(testMutex);
for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); WaitTrigger(testMutex),++ite)
{
printf("get %d size=%d it%x\n",(*ite),testvec.size(),&*ite);
FireTrigger(testMutex);
}
testvec.clear();
FireTrigger(testMutex);
}
SleepMs(1);
}
return NULL;
}注意
最后用这种方法修复了错误。一个感受就是,墨菲定律。程序中可能出错的地方,一定会出错。所以在用到非线程安全的容器时一定要注意加保护。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
