c++版线程池和任务池示例
作者:
commondef.h
//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁
const int CHECK_IDLE_TASK_INTERVAL = 300;
//单位秒,任务自动销毁时间间隔
const int TASK_DESTROY_INTERVAL = 60;
//监控线程池是否为空时间间隔,微秒
const int IDLE_CHECK_POLL_EMPTY = 500;
//线程池线程空闲自动退出时间间隔 ,5分钟
const int THREAD_WAIT_TIME_OUT = 300;
taskpool.cpp
#include "taskpool.h"
#include <string.h>
#include <stdio.h>
#include <pthread.h>
TaskPool::TaskPool(const int & poolMaxSize)
: m_poolSize(poolMaxSize)
, m_taskListSize(0)
, m_bStop(false)
{
pthread_mutex_init(&m_lock, NULL);
pthread_mutex_init(&m_idleMutex, NULL);
pthread_cond_init(&m_idleCond, NULL);
pthread_attr_t attr;
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行
pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程
pthread_attr_destroy(&attr);
}
TaskPool::~TaskPool()
{
if(!m_bStop)
{
StopPool();
}
if(!m_taskList.empty())
{
std::list<Task*>::iterator it = m_taskList.begin();
for(; it != m_taskList.end(); ++it)
{
if(*it != NULL)
{
delete *it;
*it = NULL;
}
}
m_taskList.clear();
m_taskListSize = 0;
}
if(!m_idleList.empty())
{
std::list<Task*>::iterator it = m_idleList.begin();
for(; it != m_idleList.end(); ++it)
{
if(*it != NULL)
{
delete *it;
*it = NULL;
}
}
m_idleList.clear();
}
pthread_mutex_destroy(&m_lock);
pthread_mutex_destroy(&m_idleMutex);
pthread_cond_destroy(&m_idleCond);
}
void * TaskPool::CheckIdleTask(void * arg)
{
TaskPool * pool = (TaskPool*)arg;
while(1)
{
pool->LockIdle();
pool->RemoveIdleTask();
if(pool->GetStop())
{
pool->UnlockIdle();
break;
}
pool->CheckIdleWait();
pool->UnlockIdle();
}
}
void TaskPool::StopPool()
{
m_bStop = true;
LockIdle();
pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题
UnlockIdle();
pthread_join(m_idleId, NULL);
}
bool TaskPool::GetStop()
{
return m_bStop;
}
void TaskPool::CheckIdleWait()
{
struct timespec timeout;
memset(&timeout, 0, sizeof(timeout));
timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
timeout.tv_nsec = 0;
pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}
int TaskPool::RemoveIdleTask()
{
int iRet = 0;
std::list<Task*>::iterator it, next;
std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
time_t curTime = time(0);
for(; rit != m_idleList.rend(); )
{
it = --rit.base();
if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
{
iRet++;
delete *it;
*it = NULL;
next = m_idleList.erase(it);
rit = std::list<Task*>::reverse_iterator(next);
}
else
{
break;
}
}
}
int TaskPool::AddTask(task_fun fun, void *arg)
{
int iRet = 0;
if(0 != fun)
{
pthread_mutex_lock(&m_lock);
if(m_taskListSize >= m_poolSize)
{
pthread_mutex_unlock(&m_lock);
iRet = -1; //task pool is full;
}
else
{
pthread_mutex_unlock(&m_lock);
Task * task = GetIdleTask();
if(NULL == task)
{
task = new Task;
}
if(NULL == task)
{
iRet = -2; // new failed
}
else
{
task->fun = fun;
task->data = arg;
pthread_mutex_lock(&m_lock);
m_taskList.push_back(task);
++m_taskListSize;
pthread_mutex_unlock(&m_lock);
}
}
}
return iRet;
}
Task* TaskPool::GetTask()
{
Task *task = NULL;
pthread_mutex_lock(&m_lock);
if(!m_taskList.empty())
{
task = m_taskList.front();
m_taskList.pop_front();
--m_taskListSize;
}
pthread_mutex_unlock(&m_lock);
return task;
}
void TaskPool::LockIdle()
{
pthread_mutex_lock(&m_idleMutex);
}
void TaskPool::UnlockIdle()
{
pthread_mutex_unlock(&m_idleMutex);
}
Task * TaskPool::GetIdleTask()
{
LockIdle();
Task * task = NULL;
if(!m_idleList.empty())
{
task = m_idleList.front();
m_idleList.pop_front();
}
UnlockIdle();
return task;
}
void TaskPool::SaveIdleTask(Task*task)
{
if(NULL != task)
{
task->fun = 0;
task->data = NULL;
task->last_time = time(0);
LockIdle();
m_idleList.push_front(task);
UnlockIdle();
}
}
taskpool.h
#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务
* 任务池可自动销毁长时间空闲的Task对象
* 可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间
* TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁
* date @ 2013.12.23
* author @ haibin.wang
*/
#include <list>
#include <pthread.h>
#include "commondef.h"
//所有的用户操作为一个task,
typedef void (*task_fun)(void *);
struct Task
{
task_fun fun; //任务处理函数
void* data; //任务处理数据
time_t last_time; //加入空闲队列的时间,用于自动销毁
};
//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池
class TaskPool
{
public:
/* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程
* para @ maxSize 最大任务数,大于0
*/
TaskPool(const int & poolMaxSize);
~TaskPool();
/* pur @ 添加任务到任务队列的尾部
* para @ task, 具体任务
* return @ 0 添加成功,负数 添加失败
*/
int AddTask(task_fun fun, void* arg);
/* pur @ 从任务列表的头获取一个任务
* return @ 如果列表中有任务则返回一个Task指针,否则返回一个NULL
*/
Task* GetTask();
/* pur @ 保存空闲任务到空闲队列中
* para @ task 已被调用执行的任务
* return @
*/
void SaveIdleTask(Task*task);
void StopPool();
public:
void LockIdle();
void UnlockIdle();
void CheckIdleWait();
int RemoveIdleTask();
bool GetStop();
private:
static void * CheckIdleTask(void *);
/* pur @ 获取空闲的task
* para @
* para @
* return @ NULL说明没有空闲的,否则从m_idleList中获取一个
*/
Task* GetIdleTask();
int GetTaskSize();
private:
int m_poolSize; //任务池大小
int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加
bool m_bStop; //是否停止
std::list<Task*> m_taskList;//所有待处理任务列表
std::list<Task*> m_idleList;//所有空闲任务列表
pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务
pthread_mutex_t m_idleMutex; //空闲任务队列锁
pthread_cond_t m_idleCond; //空闲队列等待条件
pthread_t m_idleId;;
};
#endif
threadpool.cpp
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)
* date @ 2014.01.03
* author @ haibin.wang
*/
#include "threadpool.h"
#include <errno.h>
#include <string.h>
/*
#include <iostream>
#include <stdio.h>
*/
Thread::Thread(bool detach, ThreadPool * pool)
: m_pool(pool)
{
pthread_attr_init(&m_attr);
if(detach)
{
pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行
}
else
{
pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
}
pthread_mutex_init(&m_mutex, NULL); //初始化互斥量
pthread_cond_init(&m_cond, NULL); //初始化条件变量
task.fun = 0;
task.data = NULL;
}
Thread::~Thread()
{
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_mutex);
pthread_attr_destroy(&m_attr);
}
ThreadPool::ThreadPool()
: m_poolMax(0)
, m_idleNum(0)
, m_totalNum(0)
, m_bStop(false)
{
pthread_mutex_init(&m_mutex, NULL);
pthread_mutex_init(&m_runMutex,NULL);
pthread_mutex_init(&m_terminalMutex, NULL);
pthread_cond_init(&m_terminalCond, NULL);
pthread_cond_init(&m_emptyCond, NULL);
}
ThreadPool::~ThreadPool()
{
/*if(!m_threads.empty())
{
std::list<Thread*>::iterator it = m_threads.begin();
for(; it != m_threads.end(); ++it)
{
if(*it != NULL)
{
pthread_cond_destroy( &((*it)->m_cond) );
pthread_mutex_destroy( &((*it)->m_mutex) );
delete *it;
*it = NULL;
}
}
m_threads.clear();
}*/
pthread_mutex_destroy(&m_runMutex);
pthread_mutex_destroy(&m_terminalMutex);
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_terminalCond);
pthread_cond_destroy(&m_emptyCond);
}
int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
if(poolMax < poolPre
|| poolPre < 0
|| poolMax <= 0)
{
return -1;
}
m_poolMax = poolMax;
int iRet = 0;
for(int i=0; i<poolPre; ++i)
{
Thread * thread = CreateThread();
if(NULL == thread)
{
iRet = -2;
}
}
if(iRet < 0)
{
std::list<Thread*>::iterator it = m_threads.begin();
for(; it!= m_threads.end(); ++it)
{
if(NULL != (*it) )
{
delete *it;
*it = NULL;
}
}
m_threads.clear();
m_totalNum = 0;
}
return iRet;
}
void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
//从线程池中获取一个线程
pthread_mutex_lock( &m_mutex);
if(m_threads.empty())
{
pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空闲线程
}
Thread * thread = m_threads.front();
m_threads.pop_front();
pthread_mutex_unlock( &m_mutex);
pthread_mutex_lock( &thread->m_mutex );
thread->task.fun = fun;
thread->task.data = arg;
pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行
pthread_mutex_unlock( &thread->m_mutex );
}
int ThreadPool::Run(task_fun fun, void * arg)
{
pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行
int iRet = 0;
if(m_totalNum <m_poolMax) //
{
if(m_threads.empty() && (NULL == CreateThread()) )
{
iRet = -1;//can not create new thread!
}
else
{
GetThreadRun(fun, arg);
}
}
else
{
GetThreadRun(fun, arg);
}
pthread_mutex_unlock(&m_runMutex);
return iRet;
}
void ThreadPool::StopPool(bool bStop)
{
m_bStop = bStop;
if(bStop)
{
//启动监控所有空闲线程是否退出的线程
Thread thread(false, this);
pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程
//阻塞等待所有空闲线程退出
pthread_join(thread.m_threadId, NULL);
}
/*if(bStop)
{
pthread_mutex_lock(&m_terminalMutex);
//启动监控所有空闲线程是否退出的线程
Thread thread(true, this);
pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程
//阻塞等待所有空闲线程退出
pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
pthread_mutex_unlock(&m_terminalMutex);
}*/
}
bool ThreadPool::GetStop()
{
return m_bStop;
}
Thread * ThreadPool::CreateThread()
{
Thread * thread = NULL;
thread = new Thread(true, this);
if(NULL != thread)
{
int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中
if(0 != iret)
{
delete thread;
thread = NULL;
}
}
return thread;
}
void * ThreadPool::WapperFun(void*arg)
{
Thread * thread = (Thread*)arg;
if(NULL == thread || NULL == thread->m_pool)
{
return NULL;
}
ThreadPool * pool = thread->m_pool;
pool->IncreaseTotalNum();
struct timespec abstime;
memset(&abstime, 0, sizeof(abstime));
while(1)
{
if(0 != thread->task.fun)
{
thread->task.fun(thread->task.data);
}
if( true == pool->GetStop() )
{
break; //确定当前任务执行完毕后再判定是否退出线程
}
pthread_mutex_lock( &thread->m_mutex );
pool->SaveIdleThread(thread); //将线程加入到空闲队列中
abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
abstime.tv_nsec = 0;
if(ETIMEDOUT == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出
{
pthread_mutex_unlock( &thread->m_mutex );
break;
}
pthread_mutex_unlock( &thread->m_mutex );
}
pool->LockMutex();
pool->DecreaseTotalNum();
if(thread != NULL)
{
pool->RemoveThread(thread);
delete thread;
thread = NULL;
}
pool->UnlockMutex();
return 0;
}
void ThreadPool::SaveIdleThread(Thread * thread )
{
if(thread)
{
thread->task.fun = 0;
thread->task.data = NULL;
LockMutex();
if(m_threads.empty())
{
pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了
}
m_threads.push_front(thread);
UnlockMutex();
}
}
int ThreadPool::TotalThreads()
{
return m_totalNum;
}
void ThreadPool::SendSignal()
{
LockMutex();
std::list<Thread*>::iterator it = m_threads.begin();
for(; it!= m_threads.end(); ++it)
{
pthread_mutex_lock( &(*it)->m_mutex );
pthread_cond_signal(&((*it)->m_cond));
pthread_mutex_unlock( &(*it)->m_mutex );
}
UnlockMutex();
}
void * ThreadPool::TerminalCheck(void* arg)
{
Thread * thread = (Thread*)arg;
if(NULL == thread || NULL == thread->m_pool)
{
return NULL;
}
ThreadPool * pool = thread->m_pool;
while((false == pool->GetStop()) || pool->TotalThreads() >0 )
{
pool->SendSignal();
usleep(IDLE_CHECK_POLL_EMPTY);
}
//pool->TerminalCondSignal();
return 0;
}
void ThreadPool::TerminalCondSignal()
{
pthread_cond_signal(&m_terminalCond);
}
void ThreadPool::RemoveThread(Thread* thread)
{
m_threads.remove(thread);
}
void ThreadPool::LockMutex()
{
pthread_mutex_lock( &m_mutex);
}
void ThreadPool::UnlockMutex()
{
pthread_mutex_unlock( &m_mutex );
}
void ThreadPool::IncreaseTotalNum()
{
LockMutex();
m_totalNum++;
UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
m_totalNum--;
}
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a
* 当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出
* date @ 2013.12.23
* author @ haibin.wang
*/
#include <list>
#include <string>
#include "taskpool.h"
//通过threadmanager来控制任务调度进程
//threadpool的TerminalCheck线程负责监测线程池所有线程退出
class ThreadPool;
class Thread
{
public:
Thread(bool detach, ThreadPool * pool);
~Thread();
pthread_t m_threadId; //线程id
pthread_mutex_t m_mutex; //互斥锁
pthread_cond_t m_cond; //条件变量
pthread_attr_t m_attr; //线程属性
Task task; //
ThreadPool * m_pool; //所属线程池
};
//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();
/* pur @ 初始化线程池
* para @ poolMax 线程池最大线程数
* para @ poolPre 预创建线程数
* return @ 0:成功
* -1: parameter error, must poolMax > poolPre >=0
* -2: 创建线程失败
*/
int InitPool(const int & poolMax, const int & poolPre);
/* pur @ 执行一个任务
* para @ task 任务指针
* return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败
*/
int Run(task_fun fun, void* arg);
/* pur @ 设置是否停止线程池工作
* para @ bStop true停止,false不停止
*/
void StopPool(bool bStop);
public: //此公有函数主要用于静态函数调用
/* pur @ 获取进程池的启停状态
* return @
*/
bool GetStop();
void SaveIdleThread(Thread * thread );
void LockMutex();
void UnlockMutex();
void DecreaseTotalNum();
void IncreaseTotalNum();
void RemoveThread(Thread* thread);
void TerminalCondSignal();
int TotalThreads();
void SendSignal();
private:
/* pur @ 创建线程
* return @ 非空 成功,NULL失败,
*/
Thread * CreateThread();
/* pur @ 从线程池中获取一个一个线程运行任务
* para @ fun 函数指针
* para @ arg 函数参数
* return @
*/
void GetThreadRun(task_fun fun, void* arg);
static void * WapperFun(void*);
static void * TerminalCheck(void*);//循环监测是否所有线程终止线程
private:
int m_poolMax;//线程池最大线程数
int m_idleNum; //空闲线程数
int m_totalNum; //当前线程总数 小于最大线程数
bool m_bStop; //是否停止线程池
pthread_mutex_t m_mutex; //线程列表锁
pthread_mutex_t m_runMutex; //run函数锁
pthread_mutex_t m_terminalMutex; //终止所有线程互斥量
pthread_cond_t m_terminalCond; //终止所有线程条件变量
pthread_cond_t m_emptyCond; //空闲线程不空条件变量
std::list<Thread*> m_threads; // 线程列表
};
#endif
threadpoolmanager.cpp
#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"
#include <errno.h>
#include <string.h>
/*#include <string.h>
#include <sys/time.h>
#include <stdio.h>*/
// struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
: m_threadPool(NULL)
, m_taskPool(NULL)
, m_bStop(false)
{
pthread_mutex_init(&m_mutex_task,NULL);
pthread_cond_init(&m_cond_task, NULL);
/* memset(&time_beg, 0, sizeof(struct timeval));
memset(&time_end, 0, sizeof(struct timeval));
gettimeofday(&time_beg, NULL);*/
}
ThreadPoolManager::~ThreadPoolManager()
{
StopAll();
if(NULL != m_threadPool)
{
delete m_threadPool;
m_threadPool = NULL;
}
if(NULL != m_taskPool)
{
delete m_taskPool;
m_taskPool = NULL;
}
pthread_cond_destroy( &m_cond_task);
pthread_mutex_destroy( &m_mutex_task );
/*gettimeofday(&time_end, NULL);
long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
printf("manager total time = %d\n", total);
gettimeofday(&time_beg, NULL);*/
}
int ThreadPoolManager::Init(
const int &tastPoolSize,
const int &threadPoolMax,
const int &threadPoolPre)
{
m_threadPool = new ThreadPool();
if(NULL == m_threadPool)
{
return -1;
}
m_taskPool = new TaskPool(tastPoolSize);
if(NULL == m_taskPool)
{
return -2;
}
if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
{
return -3;
}
//启动线程池
//启动任务池
//启动任务获取线程,从任务池中不断拿任务到线程池中
pthread_attr_t attr;
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程
pthread_attr_destroy(&attr);
return 0;
}
void ThreadPoolManager::StopAll()
{
m_bStop = true;
LockTask();
pthread_cond_signal(&m_cond_task);
UnlockTask();
pthread_join(m_taskThreadId, NULL);
//等待当前所有任务执行完毕
m_taskPool->StopPool();
m_threadPool->StopPool(true); // 停止线程池工作
}
void ThreadPoolManager::LockTask()
{
pthread_mutex_lock(&m_mutex_task);
}
void ThreadPoolManager::UnlockTask()
{
pthread_mutex_unlock(&m_mutex_task);
}
void* ThreadPoolManager::TaskThread(void* arg)
{
ThreadPoolManager * manager = (ThreadPoolManager*)arg;
while(1)
{
manager->LockTask(); //防止任务没有执行完毕发送了停止信号
while(1) //将任务队列中的任务执行完再退出
{
Task * task = manager->GetTaskPool()->GetTask();
if(NULL == task)
{
break;
}
else
{
manager->GetThreadPool()->Run(task->fun, task->data);
manager->GetTaskPool()->SaveIdleTask(task);
}
}
if(manager->GetStop())
{
manager->UnlockTask();
break;
}
manager->TaskCondWait(); //等待有任务的时候执行
manager->UnlockTask();
}
return 0;
}
ThreadPool * ThreadPoolManager::GetThreadPool()
{
return m_threadPool;
}
TaskPool * ThreadPoolManager::GetTaskPool()
{
return m_taskPool;
}
int ThreadPoolManager::Run(task_fun fun,void* arg)
{
if(0 == fun)
{
return 0;
}
if(!m_bStop)
{
int iRet = m_taskPool->AddTask(fun, arg);
if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
{
pthread_cond_signal(&m_cond_task);
UnlockTask();
}
return iRet;
}
else
{
return -3;
}
}
bool ThreadPoolManager::GetStop()
{
return m_bStop;
}
void ThreadPoolManager::TaskCondWait()
{
struct timespec to;
memset(&to, 0, sizeof to);
to.tv_sec = time(0) + 60;
to.tv_nsec = 0;
pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时
}
threadpoolmanager.h
#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
* 基本流程:
* 管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中
* 基本功能:
* 1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程
* 2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)
* 3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)
* 线程资源:
* 如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定
* 当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁
* 线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程
* 线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程
* 使用方法:
* ThreadPoolManager manager;
* manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器
* manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL
*
* date @ 2013.12.23
* author @ haibin.wang
*
* 详细参数控制可以修改commondef.h中的相关变量值
*/
#include <pthread.h>
typedef void (*task_fun)(void *);
class ThreadPool;
class TaskPool;
class ThreadPoolManager
{
public:
ThreadPoolManager();
~ThreadPoolManager();
/* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0
* para @ tastPoolSize 任务池大小
* para @ threadPoolMax 线程池最大线程数
* para @ threadPoolPre 预创建线程数
* return @ 0:初始化成功,负数 初始化失败
* -1:创建线程池失败
* -2:创建任务池失败
* -3:线程池初始化失败
*/
int Init(const int &tastPoolSize,
const int &threadPoolMax,
const int &threadPoolPre);
/* pur @ 执行一个任务
* para @ fun 需要执行的函数指针
* para @ arg fun需要的参数,默认为NULL
* return @ 0 任务分配成功,负数 任务分配失败
* -1:任务池满
* -2:任务池new失败
* -3:manager已经发送停止信号,不再接收新任务
*/
int Run(task_fun fun,void* arg=NULL);
public: //以下public函数主要用于静态函数调用
bool GetStop();
void TaskCondWait();
TaskPool * GetTaskPool();
ThreadPool * GetThreadPool();
void LockTask();
void UnlockTask();
void LockFull();
private:
static void * TaskThread(void*); //任务处理线程
void StopAll();
private:
ThreadPool *m_threadPool; //线程池
TaskPool * m_taskPool; //任务池
bool m_bStop; // 是否终止管理器
pthread_t m_taskThreadId; // TaskThread线程id
pthread_mutex_t m_mutex_task;
pthread_cond_t m_cond_task;
};
#endif
main.cpp
#include <iostream>
#include <string>
#include "threadpoolmanager.h"
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
pthread_mutex_lock(&m_mutex);
seq++;
if(seq%inter == 0 )
{
cout << "fun 1=" << seq << endl;
}
if(seq>=1000000000)
{
cout << "billion" << endl;
seq = 0;
billNum++;
}
pthread_mutex_unlock(&m_mutex);
//sleep();
}
int main(int argc, char** argv)
{
if(argc != 6)
{
cout << "必须有5个参数 任务执行次数 任务池大小 线程池大小 预创建线程数 输出间隔" << endl;
cout << "eg: ./test 999999 10000 100 10 20" << endl;
cout << "上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999" << endl;
return 0;
}
double loopSize = atof(argv[1]);
int taskSize = atoi(argv[2]);
int threadPoolSize = atoi(argv[3]);
int preSize = atoi(argv[4]);
inter = atoi(argv[5]);
pthread_mutex_init(&m_mutex,NULL);
ThreadPoolManager manager;
if(0>manager.Init(taskSize, threadPoolSize, preSize))
{
cout << "初始化失败" << endl;
return 0;
}
cout << "*******************初始化完成*********************" << endl;
struct timeval time_beg, time_end;
memset(&time_beg, 0, sizeof(struct timeval));
memset(&time_end, 0, sizeof(struct timeval));
gettimeofday(&time_beg, NULL);
double i=0;
for(; i<loopSize; ++i)
{
while(0>manager.Run(myFunc,NULL))
{
usleep(100);
}
}
gettimeofday(&time_end, NULL);
long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
cout << "total time =" << total << endl;
cout << "total num =" << i << " billion num=" << billNum<< endl;
cout << __FILE__ << "将关闭所有线程" << endl;
//pthread_mutex_destroy(&m_mutex);
return 0;
}