C 语言

关注公众号 jb51net

关闭
首页 > 软件编程 > C 语言 > c++线程池

c++线程池实现方法

作者:liujian0616

这篇文章主要介绍了c++线程池实现方法,实例分析了C++线程池的原理与相关实现技巧,需要的朋友可以参考下

本文实例讲述了c++线程池实现方法。分享给大家供大家参考。具体分析如下:

下面这个线程池是我在工作中用到过的,原理还是建立一个任务队列,让多个线程互斥的在队列中取出任务,然后执行,显然,队列是要加锁的

环境:ubuntu linux

文件名:locker.h

#ifndef LOCKER_H_ 
#define LOCKER_H_ 
#include "pthread.h" 
class locker 
{ 
public: 
  locker(); 
  virtual ~locker(); 
  bool lock(); 
  void unlock(); 
private: 
  pthread_mutex_t   m_mutex; 
}; 
#endif /* LOCKER_H_ */ 

文件名:locker.cpp

#include "locker.h" 
locker::locker() 
{ 
  pthread_mutex_init(&m_mutex, 0); 
} 
locker::~locker() 
{ 
  pthread_mutex_destroy(&m_mutex); 
} 
bool locker::lock() 
{ 
  if(0 == pthread_mutex_lock(&m_mutex)) 
    return true; 
  return false; 
} 
void locker::unlock() 
{ 
  pthread_mutex_unlock(&m_mutex); 
}

文件名:task_list.h

#ifndef TASK_LIST_H_ 
#define TASK_LIST_H_ 
#include "list" 
#include "locker.h" 
#include "netinet/in.h" 
#include "semaphore.h" 
using namespace std; 
typedef void* (*THREAD_FUNC)(void*); 
// 线程池中运行的任务,对于下行任务,sin中包含目的地址信息 
// parm0指向发出数据的对象,parm1指向数据,parm2为数据的长度 
typedef struct 
{ 
  THREAD_FUNC func; 
  void* parm0; 
  void* parm1; 
  void* parm2; 
} task_info; 
typedef list<task_info*> TASK_LIST; 
typedef list<task_info*>::iterator PTASK_LIST; 
class task_list 
{ 
public: 
  task_list(); 
  virtual ~task_list(); 
  void append_task(task_info* tsk); 
  task_info* fetch_task(); 
private: 
  TASK_LIST m_tasklist; 
  locker m_lk; 
  sem_t m_sem; 
}; 
#endif /* TASK_LIST_H_ */

文件名:task_list.cpp

#include "task_list.h" 
task_list::task_list() 
{ 
  // Init Semaphore 
  sem_init(&m_sem, 0, 0); 
  m_tasklist.clear(); 
} 
task_list::~task_list() 
{ 
  while(!m_tasklist.empty()) 
  { 
    task_info* tr = m_tasklist.front(); 
    m_tasklist.pop_front(); 
    if(tr) 
      delete tr; 
  } 
  // Destroy Semaphore 
  sem_destroy(&m_sem); 
} 
void task_list::append_task(task_info* tsk) 
{ 
  // Lock before Modify the list 
  m_lk.lock(); 
  m_tasklist.push_back(tsk); 
  m_lk.unlock(); 
  // Increase the Semaphore 
  sem_post(&m_sem); 
} 
task_info* task_list::fetch_task() 
{ 
  task_info* tr = NULL; 
  sem_wait(&m_sem); 
  m_lk.lock(); 
  tr = m_tasklist.front(); 
  m_tasklist.pop_front(); 
  m_lk.unlock(); 
  return tr; 
}

文件名:thread_pool.h

#ifndef THREAD_POOL_H_ 
#define THREAD_POOL_H_ 
#include "task_list.h" 
#include "pthread.h" 
#define DEFAULT_THREAD_COUNT  4 
#define MAXIMUM_THREAD_COUNT  1000 
class thread_pool 
{ 
public: 
  thread_pool(); 
  virtual ~thread_pool(); 
  int create_threads(int n = DEFAULT_THREAD_COUNT); 
  void delete_threads(); 
  void set_tasklist(task_list* plist); 
  void del_tasklist(); 
protected: 
  static void* thread_func(void* parm); 
  task_info* get_task(); 
private: 
  int       m_thread_cnt; 
  pthread_t    m_pids[MAXIMUM_THREAD_COUNT]; 
  task_list*   m_tasklist; 
}; 
#endif /* THREAD_POOL_H_ */ 

文件名:thread_pool.cpp

#include "thread_pool.h" 
thread_pool::thread_pool() 
{ 
  m_thread_cnt = 0; 
  m_tasklist = NULL; 
} 
thread_pool::~thread_pool() 
{ 
  delete_threads(); 
} 
task_info* thread_pool::get_task() 
{ 
  task_info* tr; 
  if (m_tasklist) 
  { 
    tr = m_tasklist->fetch_task(); 
    return tr; 
  } 
  return NULL; 
} 
void* thread_pool::thread_func(void* parm) 
{ 
  thread_pool *ptp = static_cast<thread_pool*> (parm); 
  task_info *task; 
  while (true) 
  { 
    task = ptp->get_task(); 
    if (task) 
    { 
      (*task->func)(task); 
      //delete task; //func负责释放task_info 
    } 
  } 
  return NULL; 
} 
int thread_pool::create_threads(int n) 
{ 
  if (n > MAXIMUM_THREAD_COUNT) 
    n = MAXIMUM_THREAD_COUNT; 
  delete_threads(); 
  for (int i = 0; i < n; i++) 
  { 
    int ret = pthread_create(&m_pids[i], NULL, thread_func, (void*) this); 
    if (ret != 0) 
      break; 
    m_thread_cnt++; 
  } 
  return m_thread_cnt; 
} 
void thread_pool::delete_threads() 
{ 
  for (int i = 0; i < m_thread_cnt; i++) 
  { 
    void* retval; 
    pthread_cancel(m_pids[i]); 
    pthread_join(m_pids[i], &retval); 
  } 
  m_thread_cnt = 0; 
} 
void thread_pool::set_tasklist(task_list* plist) 
{ 
  m_tasklist = plist; 
} 
void thread_pool::del_tasklist() 
{ 
  m_tasklist = NULL; 
}

文件名:test.cpp

#include "unistd.h" 
#include "stdio.h" 
#include "stdlib.h" 
#include "task_list.h" 
#include "thread_pool.h" 
void* fun(void *parm) 
{ 
  task_info* ptk = (task_info*)parm; 
  pid_t tid = pthread_self(); 
  int count = (int)ptk->parm0; 
  printf("count=%d, tid=%d\n", count, tid); 
  return NULL; 
} 
int main() 
{ 
  int count = 0; 
  thread_pool tp; 
  task_list tl; 
  tp.create_threads(4 - 1); 
  tp.set_tasklist(&tl); 
  while (1) 
  { 
    task_info* pti = NULL; 
    pti = (task_info *) malloc(sizeof(task_info)); 
    pti->func = fun; 
    pti->parm0 = (void *)count; 
    tl.append_task(pti); 
    count++; 
    sleep(2); 
  } 
// printf("hello,world\n"); 
  return 0; 
} 

编译运行,我是用ecplise建立的automake工程,所以只要修改一下Makefile.am就可以编译成功了
文件名:Makefile.am

bin_PROGRAMS=test 
test_SOURCES=test.cpp locker.h locker.cpp \ 
              task_list.h task_list.cpp \ 
              thread_pool.h thread_pool.cpp 
test_LDADD=-lpthread 

执行结果:

count=0, tid=-1219888272 
count=1, tid=-1219888272 
count=2, tid=-1228280976 
count=3, tid=-1236673680 
count=4, tid=-1219888272 
count=5, tid=-1228280976 
count=6, tid=-1236673680 
count=7, tid=-1219888272 
count=8, tid=-1228280976 
count=9, tid=-1236673680 

希望本文所述对大家的C++程序设计有所帮助。

您可能感兴趣的文章:
阅读全文