C语言实现一个多线程委托模型的示例详解
作者:图灵,图灵,图个机灵
C语言实现一个多线程委托模型
多线程委托模型将线程分为boss线程(主线程)和worker线程(工作线程)。先从一个主线程开始运行,主线程根据情况完成工作线程的创建,将创建好的工作线程放入队列中,有工作时,主线程唤醒工作参与工作。如果工作线程产生异常,主线程可以关闭工作线程并开启新的工作线程。
以下是使用C语言实现多线程委托模型的代码,其中包含boss线程和worker线程,boss线程用于创建worker线程并将其放入工作队列中,有任务时唤醒worker线程:
#include <pthread.h> #include <stdio.h> #include <stdlib.h> typedef struct { void *(*task)(void *arg); void *arg; } Task; Task *task_create(void *(*task)(void *arg), void *arg); void task_destroy(Task *task); typedef struct { int thread_count; int task_count; int head; int tail; Task **tasks; pthread_mutex_t mutex; pthread_cond_t done; } ThreadPool; ThreadPool *threadpool_create(int thread_count, int task_count); void threadpool_destroy(ThreadPool *pool); void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg); Task *threadpool_get_task(ThreadPool *pool); void *worker_thread(void *arg); Task *task_create(void *(*task)(void *arg), void *arg) { Task *t = (Task*) malloc(sizeof(Task)); t->task = task; t->arg = arg; return t; } void task_destroy(Task *task) { free(task); } ThreadPool *threadpool_create(int thread_count, int task_count) { ThreadPool *pool = (ThreadPool*) malloc(sizeof(ThreadPool)); pool->thread_count = thread_count; pool->task_count = task_count; pool->head = pool->tail = 0; pool->tasks = (Task**) malloc(sizeof(Task*) * task_count); pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->done, NULL); int i; for (i = 0; i < pool->thread_count; i++) { pthread_t thread; pthread_create(&thread, NULL, worker_thread, pool); pthread_detach(thread); } return pool; } void threadpool_destroy(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); int i; for (i = 0; i < pool->tail; i++) { task_destroy(pool->tasks[i]); } free(pool->tasks); free(pool); pthread_mutex_unlock(&pool->mutex); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->done); } void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg) { pthread_mutex_lock(&pool->mutex); Task *t = task_create(task, arg); if (pool->tail == pool->task_count) { pool->task_count *= 2; pool->tasks = (Task**) realloc(pool->tasks, sizeof(Task*) * pool->task_count); } pool->tasks[pool->tail++] = t; pthread_cond_signal(&pool->done); pthread_mutex_unlock(&pool->mutex); } Task *threadpool_get_task(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); while (pool->head == pool->tail) { pthread_cond_wait(&pool->done, &pool->mutex); } Task *t = pool->tasks[pool->head++]; pthread_mutex_unlock(&pool->mutex); return t; } void *worker_thread(void *arg) { ThreadPool *pool = (ThreadPool*) arg; for (;;) { Task *t = threadpool_get_task(pool); (*(t->task))(t->arg); task_destroy(t); } return NULL; } void * boss_task(void *arg) { ThreadPool *pool = (ThreadPool*) arg; // 在boss线程中添加任务 int i; for (i = 0; i < 10; i++) { threadpool_add_task(pool, worker_task, NULL); } return NULL; } void * worker_task(void *arg) { printf("Worker thread running\n"); return NULL; } int main(int argc, char *argv[]) { ThreadPool *pool = threadpool_create(4, 10); threadpool_add_task(pool, boss_task, pool); sleep(10); threadpool_destroy(pool); return 0; }
在这个示例中,我们定义了一个ThreadPool
结构体,其中包括一个任务数组、一个锁和一个条件变量。worker_thread
函数是用于执行任务的线程函数,而threadpool_create
、threadpool_add_task
和threadpool_get_task
函数用于创建、管理和调度任务。
在main
函数中,我们创建了一个包含4个线程、最大任务数量为10的线程池,并在其中添加了一个boss线程,用于向线程池中添加worker线程任务。在每个worker任务中,我们只输出一条消息,表示线程正在运行。
这就是一个使用C语言实现的多线程委托模型,其中包含了boss线程和worker线程。在实际使用时,应根据具体应用场景进行更进一步的修改和扩展。
如果工作线程产生异常,主线程可以关闭工作线程并开启新的工作线程
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> #include <errno.h> typedef struct { void *(*task)(void *arg); void *arg; } Task; Task *task_create(void *(*task)(void *arg), void *arg); void task_destroy(Task *task); typedef struct { int thread_count; int task_count; int head; int tail; Task **tasks; pthread_mutex_t mutex; pthread_cond_t done; } ThreadPool; ThreadPool *threadpool_create(int thread_count, int task_count); void threadpool_destroy(ThreadPool *pool); void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg); Task *threadpool_get_task(ThreadPool *pool); void *worker_thread(void *arg); Task *task_create(void *(*task)(void *arg), void *arg) { Task *t = (Task*) malloc(sizeof(Task)); t->task = task; t->arg = arg; return t; } void task_destroy(Task *task) { free(task); } ThreadPool *threadpool_create(int thread_count, int task_count) { ThreadPool *pool = (ThreadPool*) malloc(sizeof(ThreadPool)); pool->thread_count = thread_count; pool->task_count = task_count; pool->head = pool->tail = 0; pool->tasks = (Task**) malloc(sizeof(Task*) * task_count); pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->done, NULL); int i; for (i = 0; i < pool->thread_count; i++) { pthread_t thread; pthread_create(&thread, NULL, worker_thread, pool); pthread_detach(thread); } return pool; } void threadpool_destroy(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); int i; for (i = 0; i < pool->tail; i++) { task_destroy(pool->tasks[i]); } free(pool->tasks); free(pool); pthread_mutex_unlock(&pool->mutex); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->done); } void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg) { pthread_mutex_lock(&pool->mutex); Task *t = task_create(task, arg); if (pool->tail == pool->task_count) { pool->task_count *= 2; pool->tasks = (Task**) realloc(pool->tasks, sizeof(Task*) * pool->task_count); } pool->tasks[pool->tail++] = t; pthread_cond_signal(&pool->done); pthread_mutex_unlock(&pool->mutex); } Task *threadpool_get_task(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); while (pool->head == pool->tail) { pthread_cond_wait(&pool->done, &pool->mutex); } Task *t = pool->tasks[pool->head++]; pthread_mutex_unlock(&pool->mutex); return t; } void *worker_thread(void *arg) { ThreadPool *pool = (ThreadPool*) arg; for (;;) { Task *t = threadpool_get_task(pool); int ret = 0; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); ret = (*(t->task))(t->arg); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); task_destroy(t); if (ret != 0) { pthread_mutex_lock(&pool->mutex); printf("Worker thread exited with error: %d\n", ret); pool->thread_count -= 1; pthread_mutex_unlock(&pool->mutex); pthread_exit(NULL); } } return NULL; } void signal_handler(int signum) { // 忽略这里的信号处理函数 } void * boss_task(void *arg) { ThreadPool *pool = (ThreadPool*) arg; // 安装一个信号处理函数,方便关闭工作线程 struct sigaction act; act.sa_handler = signal_handler; sigaction(SIGUSR1, &act, NULL); // 在boss线程中添加任务 int i; for (i = 0; i < 10; i++) { threadpool_add_task(pool, worker_task, NULL); } return NULL; } void * worker_task(void *arg) { int i; for (i = 0; i < 10; i++) { printf("Worker thread running: %d\n", i); sleep(1); // 模拟工作线程异常 if (i == 5) { printf("Worker thread encountered an error\n"); // 发送信号关闭工作线程 pthread_kill(pthread_self(), SIGUSR1); return (void*) 1; } } return NULL; } int main(int argc, char *argv[]) { ThreadPool *pool = threadpool_create(4, 10); threadpool_add_task(pool, boss_task, pool); // 运行10秒后退出 sleep(10); // 关闭所有工作线程 int i; for (i = 0; i < pool->thread_count; i++) { pthread_cancel(0); } threadpool_destroy(pool); return 0; }
在这个示例中,我们基本上沿用了前面的代码,只是添加了处理工作线程异常的代码。我们在worker_thread函数中,通过调用pthread_setcancelstate函数禁止了线程被取消,然后执行工作任务,最后恢复线程的取消状态。如果线程执行任务时出现异常,我们在主线程中通过发送信号SIGUSR1来关闭工作线程。
在boss_task中添加的任务只是简单地输出一条消息,模拟了一些随机的操作。这里我们安装了一个信号处理函数,方便在工作线程内部发生异常时正确关闭线程。在main函数中,我们运行了10秒钟,然后通过pthread_cancel函数关闭了所有工作线程。
这就是一个使用C语言实现多线程委托模型的例子,其中包含boss线程和worker线程,可以处理工作线程的异常情况。从这个示例中,我们可以学到如何创建线程池,如何向线程池中添加任务,如何安全地关闭线程池,以及如何正确处理线程异常等知识。
到此这篇关于C语言实现一个多线程委托模型的文章就介绍到这了,更多相关C语言多线程委托模型内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!