C++ vector在多线程操作中出现内存错误问题及解决
作者:mhrobot
vector在多线程操作中出现内存错误问题
C++ vector的reserve和resize详解
reserve是容器预留空间,但在空间内不真正创建元素对象,所以在没有添加新的对象之前,不能引用容器内的元素。加入新的元素时,要调用push_back()/insert()函数。
resize是改变容器的大小,且在创建对象,因此,调用这个函数之后,就可以引用容器内的对象了,因此当加入新的元素时,用operator[]操作符,或者用迭代器来引用元素对象。此时再调用push_back()函数,是加在这个新的空间后面的。
vector在多线程中操作举例:
有一个全局变量 vector goods_list;
在A线程中从服务器获取最新商品列表,goods_list.push_back()
在B线程中不断的下载商品图片,
Goods &goods = goods_list.at(i)
读取goods .pic_url,下载完成后赋值 goods.local_pic = local_pic
以上简单的逻辑,缺导致程序崩溃,提示内存写入错误。
调试定位到goods.local_pic = local_pic这一句。
估计就是多线程的问题。
查了一下资料,原来vector每次push_back都会重新分配内存,导致goods 这个引用无效,所以goods.local_pic = local_pic赋值写入的时候就会写入到一个无效的地址,导致程序崩溃。
解决办法
加锁也可以解决这个问题,不过那样太低效了,不予考虑。最后的解决方案是,用vector的reserve方法预先分配好内存,免得在使用中动态增长。
在构造函数中提前对goods_list.reserve(30000)分配足够的固定内存,这样就不用每次pushback都申请增加内存、重新分配内存 导致的原内存地址无效,而且效率也高很多。
跨平台使用C++ vector的多线程问题
源起
最近碰到一个linux下程序崩溃的问题,涉及到vector的多线程使用的问题。由于是第二次折腾这个问题,所以把过程记录下来。
简单介绍一下背景:程序为windows和linux跨平台使用。使用一套代码,会分别编译两个平台下的不同版本。
程序涉及的结构。使用了一个全局变量的vector来保存数据,有两个线程,一个线程是周期执行的,每个周期开始时检查全局变量中是否有数据,如果有就取出来处理,然后清空。另一个线程等待外部输入数据,如果有数据就放进vector。
示例代码如下:
#include "stdafx.h" #include <vector> typedef void *(TASK_ENTRY_POINT)(void *); const unsigned short Task_Priority_Base = 50; // 基本优先级 #if defined WIN32 // windows 操作系统 typedef void * SIGNAL_HANDLE; #else // 标准linux 操作系统 #include <semaphore.h> typedef sem_t * SIGNAL_HANDLE; #endif void SleepUs(unsigned long us) { #if defined WIN32 // windows 操作系统 ::Sleep(us); #else // 标准linux 操作系统 if (us>60000) // > 1min sleep(us/1000000); else usleep(us); #endif } void SleepMs(unsigned long Ms) { #if defined WIN32 // windows 操作系统 ::Sleep(Ms); #else // 标准linux 操作系统 if (Ms>60000) // > 1min sleep(Ms/1000); else usleep(Ms*1000); #endif } static int running_tasks = 0; void CreateTask(TASK_ENTRY_POINT EntryFunc,int Priority=0, void *ArgP = NULL); typedef std::vector<int > vectorType; vectorType testvec; bool init_task_library() { running_tasks = 0; int St=0; // 设置调度策略或基本优先级 #if defined WIN32 // windows 操作系统 SetPriorityClass(GetCurrentProcess(),HIGH_PRIORITY_CLASS); #else // 标准linux 操作系统 sched_param Param; int PriorityMax,PriorityMin; // 禁止内存交换 // St=mlockall(MCL_CURRENT|MCL_FUTURE); // 设置优先级 PriorityMax = sched_get_priority_max(SCHED_RR); PriorityMin = sched_get_priority_min(SCHED_RR); if (Task_Priority_Base>PriorityMax) Param.__sched_priority = PriorityMax; else Param.__sched_priority = Task_Priority_Base; St=sched_setscheduler(0,SCHED_RR,&Param); #endif return true; } void CreateTask(TASK_ENTRY_POINT EntryFunc,int Priority, void *ArgP) { static bool LibInit=false; if (!LibInit) LibInit=init_task_library(); assert(LibInit); assert(EntryFunc); #if defined WIN32 // windows 操作系统 HANDLE Tid; Tid=(HANDLE)::_beginthread((void (*)(void *))EntryFunc, 0, ArgP); ::SetThreadPriority(Tid,Priority); #else // 标准linux 操作系统 int St; pthread_t Tid; sched_param Param; int Policy; // 创建线程 St = pthread_create(&Tid,NULL,EntryFunc,ArgP); assert(St==0); pthread_detach(Tid); // 设置线程参数 Policy = SCHED_RR; Param.__sched_priority=Task_Priority_Base + Priority; St=pthread_setschedparam(Tid,Policy,&Param); #endif ++running_tasks; } void *addElement(void *ArgP) { int i=0; for(int i=0;i<10000;i++) { int res=i; for(int j=0;j<1000;j++) { testvec.push_back(res); } printf("addelement %d \n",res); SleepMs(100); } printf("addelement done\n"); return NULL; } void *clearElement(void *ArgP) { int i=0; for(int i=0;i<100000000;i++) { if(testvec.size()>0) { for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); ++ite)//崩点1 { { printf("get %d size=%d begin\n",(*ite),testvec.size());//,&*(testvec.begin()));//崩点2 } } printf("clear\n"); testvec.clear();//崩点3 } SleepMs(1); } printf("clearelement done\n"); return NULL; } int main(int argc, char* argv[]) { CreateTask(addElement); CreateTask(clearElement); printf("done\n"); while(1) { SleepMs(10000); } return 0; }
reserve问题
真实代码中周期执行的线程大部分时间在sleep,而接收数据的线程也在很少的情况下才会收到数据,因此运行了很长时间也没有出现问题。但是示例代码中,不管是linux还是windows下,却是一跑就崩的。在window下报“vector iterators incompatible” ,在linux下直接segfault。
首先说的是reserve问题。
vector的内存是动态分配的,因此只要vector的大小超过了当前的大小,就会重新开辟一块新的内存,大小为现在大小的两倍,这就导致了如果大小涨了的话,内存会变化的。而如果一个线程里在一直写,另一个线程里用iterator来读,那么如果第一个线程里内存已经变了,读的线程还用原来的地址,就会导致程序崩溃。
这个问题还是比较好解决的。就是首先为vector保留内存大小。使用reserve函数
对代码的改进:
main函数改为:
int main(int argc, char* argv[]) { char name[1024]; sprintf(name, "testSyncMutex"); testMutex = CreateTrigger(name); FireTrigger(testMutex); testvec.reserve(10000000); printf("hello world\n"); CreateTask(addElement); CreateTask(clearElement); printf("done\n"); while(1) { SleepMs(10000); } return 0; }
clear问题
经过了reserve的修改,生产环境的代码大概率不会崩了,但是小概率事件在大基数面前也会出现。程序还是崩了。再次检视了生产代码,觉得应该加个锁了。
vector并不是线程安全的,所以,虽然生产环境下概率比较小,但是仍然是存在漏洞的。就比如示例代码,仍然会崩。
一个线程里写,另一个线程里读,而且可能崩在任何读的地方(代码注释崩点1~崩点3)。
那么下面就是怎么改了,通过信号量来加锁。增加函数
SIGNAL_HANDLE testMutex; // 创建信号灯 SIGNAL_HANDLE CreateTrigger(const char* SigName) { assert(SigName); #if defined WIN32 // windows 操作系统 return ::CreateSemaphoreA(NULL,0,1,SigName); /*return ::CreateEvent( NULL, // no security attributes FALSE, // auto reset FALSE, // initially not signaled SigName); // name of mutex */ #else // 标准linux 操作系统 sem_t * SemP; SemP=new sem_t; assert(SemP); int St=sem_init(SemP,0,0); // 线程间共享,初值为0 assert(St != -1); return SemP; // return sem_open(SigName,O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH,0); #endif } // 释放信号灯 void FreeTrigger(SIGNAL_HANDLE Handle) { assert(Handle); #if defined WIN32 // windows 操作系统 ::CloseHandle(Handle); #else // 标准linux 操作系统 sem_destroy(Handle); delete Handle; //sem_close(Handle); #endif } // 触发信号灯 void FireTrigger(SIGNAL_HANDLE Handle) { assert(Handle); #if defined WIN32 // windows 操作系统 //::SetEvent(Handle); ReleaseSemaphore (Handle,1,NULL); #else // 标准linux 操作系统 sem_post(Handle); #endif } // 等待信号灯 void WaitTrigger(SIGNAL_HANDLE Handle) { assert(Handle); #if defined WIN32 // windows 操作系统 ::WaitForSingleObject(Handle, INFINITE); #else // 标准linux 操作系统 sem_wait(Handle); #endif }
main函数改为:
int main(int argc, char* argv[]) { char name[1024]; sprintf(name, "testSyncMutex"); testMutex = CreateTrigger(name); FireTrigger(testMutex); testvec.reserve(10000000); CreateTask(addElement); CreateTask(clearElement); while(1) { SleepMs(10000); } return 0; }
addElement函数改为
void *addElement(void *ArgP) { int i=0; int res=0; for(int i=0;i<10000;i++) { for(int j=0;j<1000;j++) { WaitTrigger(testMutex); res=i*1000+j; testvec.push_back(res); FireTrigger(testMutex); } SleepMs(100); } return NULL; }
clearElement中有两种改法,一种是对整个循环加锁。
代码如下:
void *clearElement(void *ArgP) { int i=0; for(int i=0;i<100000000;i++) { i++; if(testvec.size()>0) { WaitTrigger(testMutex); for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); ++ite) { printf("get %d size=%d it%x\n",(*ite),testvec.size(),&*ite); } testvec.clear(); FireTrigger(testMutex); } SleepMs(1); } return NULL; }
但是这种做法的副作用也很明显,如果读的线程中执行的操作较多或需要执行的数据条数较多,可能会占用写线程中的执行时间。
另一种改法是加锁的位置更加分散,如下:
void *clearElement(void *ArgP) { int i=0; for(int i=0;i<100000000;i++) { if(testvec.size()>0) { WaitTrigger(testMutex); for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); WaitTrigger(testMutex),++ite) { printf("get %d size=%d it%x\n",(*ite),testvec.size(),&*ite); FireTrigger(testMutex); } testvec.clear(); FireTrigger(testMutex); } SleepMs(1); } return NULL; }
注意
最后用这种方法修复了错误。一个感受就是,墨菲定律。程序中可能出错的地方,一定会出错。所以在用到非线程安全的容器时一定要注意加保护。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。