C 语言

关注公众号 jb51net

关闭
首页 > 软件编程 > C 语言 > C++ vector多线程操作内存错误

C++ vector在多线程操作中出现内存错误问题及解决

作者:mhrobot

这篇文章主要介绍了C++ vector在多线程操作中出现内存错误问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

vector在多线程操作中出现内存错误问题

C++ vector的reserve和resize详解

reserve是容器预留空间,但在空间内不真正创建元素对象,所以在没有添加新的对象之前,不能引用容器内的元素。加入新的元素时,要调用push_back()/insert()函数。

resize是改变容器的大小,且在创建对象,因此,调用这个函数之后,就可以引用容器内的对象了,因此当加入新的元素时,用operator[]操作符,或者用迭代器来引用元素对象。此时再调用push_back()函数,是加在这个新的空间后面的。

vector在多线程中操作举例:

有一个全局变量 vector goods_list;

在A线程中从服务器获取最新商品列表,goods_list.push_back()

在B线程中不断的下载商品图片,

Goods &goods = goods_list.at(i)

读取goods .pic_url,下载完成后赋值 goods.local_pic = local_pic

以上简单的逻辑,缺导致程序崩溃,提示内存写入错误。

调试定位到goods.local_pic = local_pic这一句。

估计就是多线程的问题。

查了一下资料,原来vector每次push_back都会重新分配内存,导致goods 这个引用无效,所以goods.local_pic = local_pic赋值写入的时候就会写入到一个无效的地址,导致程序崩溃。

解决办法

加锁也可以解决这个问题,不过那样太低效了,不予考虑。最后的解决方案是,用vector的reserve方法预先分配好内存,免得在使用中动态增长。

在构造函数中提前对goods_list.reserve(30000)分配足够的固定内存,这样就不用每次pushback都申请增加内存、重新分配内存 导致的原内存地址无效,而且效率也高很多。

跨平台使用C++ vector的多线程问题

源起

最近碰到一个linux下程序崩溃的问题,涉及到vector的多线程使用的问题。由于是第二次折腾这个问题,所以把过程记录下来。

简单介绍一下背景:程序为windows和linux跨平台使用。使用一套代码,会分别编译两个平台下的不同版本。

程序涉及的结构。使用了一个全局变量的vector来保存数据,有两个线程,一个线程是周期执行的,每个周期开始时检查全局变量中是否有数据,如果有就取出来处理,然后清空。另一个线程等待外部输入数据,如果有数据就放进vector。

示例代码如下:

#include "stdafx.h"
#include <vector>
typedef void *(TASK_ENTRY_POINT)(void *);
const unsigned short   Task_Priority_Base    = 50;        // 基本优先级
#if defined WIN32        // windows 操作系统
    typedef void * SIGNAL_HANDLE;
#else                    // 标准linux 操作系统
    #include <semaphore.h>    
    typedef sem_t * SIGNAL_HANDLE;
#endif
    void SleepUs(unsigned long us)
    {
#if defined WIN32                // windows 操作系统
        ::Sleep(us);
#else                            // 标准linux 操作系统
        if (us>60000)                // > 1min
            sleep(us/1000000);
        else
            usleep(us);
#endif
    }
void SleepMs(unsigned long Ms)
{
#if defined WIN32                // windows 操作系统
        ::Sleep(Ms);
#else                            // 标准linux 操作系统
    if (Ms>60000)                // > 1min
        sleep(Ms/1000);
    else
        usleep(Ms*1000);
#endif
}
static int running_tasks = 0;
void CreateTask(TASK_ENTRY_POINT EntryFunc,int Priority=0, void *ArgP = NULL);
typedef std::vector<int >    vectorType;
vectorType testvec;
bool init_task_library()
{
    running_tasks = 0;
    int St=0;
    // 设置调度策略或基本优先级
#if defined WIN32                // windows 操作系统
    SetPriorityClass(GetCurrentProcess(),HIGH_PRIORITY_CLASS);
#else                            // 标准linux 操作系统
    sched_param    Param;
    int            PriorityMax,PriorityMin;
    // 禁止内存交换
//    St=mlockall(MCL_CURRENT|MCL_FUTURE);
    // 设置优先级
    PriorityMax    = sched_get_priority_max(SCHED_RR);
    PriorityMin    = sched_get_priority_min(SCHED_RR);
    if (Task_Priority_Base>PriorityMax)
        Param.__sched_priority = PriorityMax;
    else
        Param.__sched_priority = Task_Priority_Base;
    St=sched_setscheduler(0,SCHED_RR,&Param);
#endif
    return true;
}
void CreateTask(TASK_ENTRY_POINT EntryFunc,int Priority, void *ArgP)
{
    static bool LibInit=false;
    if (!LibInit)
        LibInit=init_task_library();
    assert(LibInit);
    assert(EntryFunc);
#if defined WIN32                // windows 操作系统
    HANDLE Tid;
    Tid=(HANDLE)::_beginthread((void (*)(void *))EntryFunc, 0, ArgP);
    ::SetThreadPriority(Tid,Priority);
#else                            // 标准linux 操作系统
    int St;
    pthread_t         Tid;
    sched_param        Param;
    int                Policy;
    // 创建线程
    St = pthread_create(&Tid,NULL,EntryFunc,ArgP);
    assert(St==0);
    pthread_detach(Tid);
    // 设置线程参数
    Policy = SCHED_RR;
    Param.__sched_priority=Task_Priority_Base + Priority;
    St=pthread_setschedparam(Tid,Policy,&Param);
#endif
    ++running_tasks;
}
void *addElement(void *ArgP)
{
    int i=0;
    for(int i=0;i<10000;i++)
    {
        int res=i;
        for(int j=0;j<1000;j++)
        {
            testvec.push_back(res);
        }
        printf("addelement %d \n",res);
        SleepMs(100);
    }
    printf("addelement done\n");
    return NULL;
}
void *clearElement(void *ArgP)
{
    int i=0;
    for(int i=0;i<100000000;i++)
    {
        if(testvec.size()>0)
        {
            for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); ++ite)//崩点1
            {
                {
                    printf("get %d size=%d begin\n",(*ite),testvec.size());//,&*(testvec.begin()));//崩点2
                }
            }
            printf("clear\n");
            testvec.clear();//崩点3
        }
        SleepMs(1);
    }
    printf("clearelement done\n");
    return NULL;
}
int main(int argc, char* argv[])
{
    CreateTask(addElement);
    CreateTask(clearElement);
    printf("done\n");
    while(1)
    {
        SleepMs(10000);
    }
    return 0;
}

reserve问题

真实代码中周期执行的线程大部分时间在sleep,而接收数据的线程也在很少的情况下才会收到数据,因此运行了很长时间也没有出现问题。但是示例代码中,不管是linux还是windows下,却是一跑就崩的。在window下报“vector iterators incompatible” ,在linux下直接segfault。

首先说的是reserve问题。

vector的内存是动态分配的,因此只要vector的大小超过了当前的大小,就会重新开辟一块新的内存,大小为现在大小的两倍,这就导致了如果大小涨了的话,内存会变化的。而如果一个线程里在一直写,另一个线程里用iterator来读,那么如果第一个线程里内存已经变了,读的线程还用原来的地址,就会导致程序崩溃。

这个问题还是比较好解决的。就是首先为vector保留内存大小。使用reserve函数

对代码的改进:

main函数改为:

int main(int argc, char* argv[])
{
    char name[1024];
    sprintf(name, "testSyncMutex");
    testMutex = CreateTrigger(name);
    FireTrigger(testMutex);
    testvec.reserve(10000000);
    printf("hello world\n");
    CreateTask(addElement);
    CreateTask(clearElement);
    printf("done\n");
    while(1)
    {
        SleepMs(10000);
    }
    return 0;
}

clear问题

经过了reserve的修改,生产环境的代码大概率不会崩了,但是小概率事件在大基数面前也会出现。程序还是崩了。再次检视了生产代码,觉得应该加个锁了。

vector并不是线程安全的,所以,虽然生产环境下概率比较小,但是仍然是存在漏洞的。就比如示例代码,仍然会崩。

一个线程里写,另一个线程里读,而且可能崩在任何读的地方(代码注释崩点1~崩点3)。

那么下面就是怎么改了,通过信号量来加锁。增加函数

SIGNAL_HANDLE    testMutex;
// 创建信号灯
SIGNAL_HANDLE    CreateTrigger(const char* SigName)
{
    assert(SigName);
#if defined WIN32                // windows 操作系统
    return ::CreateSemaphoreA(NULL,0,1,SigName);
    /*return ::CreateEvent(    NULL,       // no security attributes
                            FALSE,        // auto reset
                            FALSE,      // initially not signaled
                            SigName);    // name of mutex
    */
#else                                    // 标准linux 操作系统
    sem_t * SemP;
    SemP=new sem_t;
    assert(SemP);
    int St=sem_init(SemP,0,0);            // 线程间共享,初值为0
    assert(St != -1);
    return SemP;
//    return sem_open(SigName,O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH,0);
#endif
}
// 释放信号灯
void    FreeTrigger(SIGNAL_HANDLE Handle)
{
    assert(Handle);
#if defined WIN32                        // windows 操作系统
    ::CloseHandle(Handle);
#else                                    // 标准linux 操作系统
    sem_destroy(Handle);
    delete Handle;
    //sem_close(Handle);
#endif
}
// 触发信号灯
void    FireTrigger(SIGNAL_HANDLE Handle)
{
    assert(Handle);
#if defined WIN32                // windows 操作系统
    //::SetEvent(Handle);
    ReleaseSemaphore (Handle,1,NULL);
#else                            // 标准linux 操作系统
    sem_post(Handle);
#endif
}
// 等待信号灯
void WaitTrigger(SIGNAL_HANDLE Handle)
{
    assert(Handle);
#if defined WIN32                // windows 操作系统
    ::WaitForSingleObject(Handle, INFINITE);
#else                            // 标准linux 操作系统
    sem_wait(Handle);
#endif
}

main函数改为:

int main(int argc, char* argv[])
{
    char name[1024];
    sprintf(name, "testSyncMutex");
    testMutex = CreateTrigger(name);
    FireTrigger(testMutex);
    testvec.reserve(10000000);
    CreateTask(addElement);
    CreateTask(clearElement);
    while(1)
    {
        SleepMs(10000);
    }
    return 0;
}

addElement函数改为

void *addElement(void *ArgP)
{
    int i=0;
    int res=0;
    for(int i=0;i<10000;i++)
    {
        for(int j=0;j<1000;j++)
        {
            WaitTrigger(testMutex);
            res=i*1000+j;
            testvec.push_back(res);
            FireTrigger(testMutex);
        }
        SleepMs(100);
    }
    return NULL;
}

clearElement中有两种改法,一种是对整个循环加锁。

代码如下:

void *clearElement(void *ArgP)
{
    int i=0;
    for(int i=0;i<100000000;i++)
    {
        i++;
        if(testvec.size()>0)
        {
        WaitTrigger(testMutex);
            for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); ++ite)
            {
                    printf("get %d size=%d it%x\n",(*ite),testvec.size(),&*ite);
            }
            testvec.clear();
            FireTrigger(testMutex);
        }
        SleepMs(1);
    }
    return NULL;
}

但是这种做法的副作用也很明显,如果读的线程中执行的操作较多或需要执行的数据条数较多,可能会占用写线程中的执行时间。

另一种改法是加锁的位置更加分散,如下:

void *clearElement(void *ArgP)
{
    int i=0;
    for(int i=0;i<100000000;i++)
    {
        if(testvec.size()>0)
        {
         WaitTrigger(testMutex);
            for(vectorType::iterator ite=testvec.begin(); ite!=testvec.end(); WaitTrigger(testMutex),++ite)
            {
                    printf("get %d size=%d it%x\n",(*ite),testvec.size(),&*ite);
                    FireTrigger(testMutex);
            }
            testvec.clear();
            FireTrigger(testMutex);
        }
        SleepMs(1);
    }
    return NULL;
}

注意

最后用这种方法修复了错误。一个感受就是,墨菲定律。程序中可能出错的地方,一定会出错。所以在用到非线程安全的容器时一定要注意加保护。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文