linux多线程编程详解教程(线程通过信号量实现通信代码)
作者:
线程分类
线程按照其调度者可以分为用户级线程和核心级线程两种。
(1)用户级线程
用户级线程主要解决的是上下文切换的问题,它的调度算法和调度过程全部由用户自行选择决定,在运行时不需要特定的内核支持。在这里,操作系统往往会提供一个用户空间的线程库,该线程库提供了线程的创建、调度、撤销等功能,而内核仍然仅对进程进行管理。如果一个进程中的某一个线程调用了一个阻塞的系统调用,那么该进程包括该进程中的其他所有线程也同时被阻塞。这种用户级线程的主要缺点是在一个进程中的多个线程的调度中无法发挥多处理器的优势。
(2)核心级线程
这种线程允许不同进程中的线程按照同一相对优先调度方法进行调度,这样就可以发挥多处理器的并发优势。
现在大多数系统都采用用户级线程与核心级线程并存的方法。一个用户级线程可以对应一个或几个核心级线程,也就是“一对一”或“多对一”模型。这样既可满足多处理机系统的需要,也可以最大限度地减少调度开销。
Linux的线程实现是在核外进行的,核内提供的是创建进程的接口do_fork()。内核提供了两个系统调用clone()和fork(),最终都用不同的参数调用do_fork()核内API。当然,要想实现线程,没有核心对多进程(其实是轻量级进程)共享数据段的支持是不行的,因此,do_fork()提供了很多参数,包括CLONE_VM(共享内存空间)、CLONE_FS(共享文件系统信息)、 CLONE_FILES(共享文件描述符表)、CLONE_SIGHAND(共享信号句柄表)和CLONE_PID(共享进程ID,仅对核内进程,即0号进程有效)。当使用fork系统调用时,内核调用do_fork()不使用任何共享属性,进程拥有独立的运行环境,而使用 pthread_create()来创建线程时,则最终设置了所有这些属性来调用__clone(),而这些参数又全部传给核内的do_fork(),从而创建的“进程”拥有共享的运行环境,只有栈是独立的,由__clone()传入。
Linux线程在核内是以轻量级进程的形式存在的,拥有独立的进程表项,而所有的创建、同步、删除等操作都在核外pthread库中进行。pthread 库使用一个管理线程(__pthread_manager(),每个进程独立且唯一)来管理线程的创建和终止,为线程分配线程ID,发送线程相关的信号(比如Cancel),而主线程(pthread_create())的调用者则通过管道将请求信息传给管理线程。
主要函数说明
1.线程的创建和退出
pthread_create 线程创建函数
int pthread_create (pthread_t * thread_id,__const pthread_attr_t * __attr,void *(*__start_routine) (void *),void *__restrict __arg);
线程创建函数第一个参数为指向线程标识符的指针,第二个参数用来设置线程属性,第三个参数是线程运行函数的起始地址,最后一个参数是运行函数的参数。这里,我们的函数thread 不需要参数,所以最后一个参数设为空指针。第二个参数我们也设为空指针,这样将生成默认属性的线程。当创建线程成功时,函数返回0,若不为0 则说明创建线程失败,常见的错误返回代码为EAGAIN 和EINVAL。前者表示系统限制创建新的线程,例如线程数目过多了;后者表示第二个参数代表的线程属性值非法。创建线程成功后,新创建的线程则运行参数三和参数四确定的函数,原来的线程则继续运行下一行代码。
pthread_join 函数,来等待一个线程的结束。
函数原型为:int pthread_join (pthread_t __th, void **__thread_return)
第一个参数为被等待的线程标识符,第二个参数为一个用户定义的指针,它可以用来存储被等待线程的返回值。这个函数是一个线程阻塞的函数,调用它的函数将一直等待到被等待的线程结束为止,当函数返回时,被等待线程的资源被收回。线程只能被一个线程等待终止,并且应处于joinable状态(非detached)。
pthread_exit 函数
一个线程的结束有两种途径,一种是线程运行的函数结束了,调用它的线程也就结束了;
另一种方式是通过函数pthread_exit 来实现。它的函数原型为:void pthread_exit (void *__retval)唯一的参数是函数的返回代码,只要pthread_join 中的第二个参数thread_return 不是NULL,这个值将被传递给thread_return。最后要说明的是,一个线程不能被多个线程等待,否则第一个接收到信号的线程成功返回,其余调用pthread_join 的线程则返回错误代码ESRCH。
2.线程属性
pthread_create函数的第二个参数线程的属性。将该值设为NULL,也就是采用默认属性,线程的多项属性都是可以更改的。这些属性主要包括绑定属性、分离属性、堆栈地址、堆栈大小、优先级。其中系统默认的属性为非绑定、非分离、缺省1M 的堆栈、与父进程同样级别的优先级。下面首先对绑定属性和分离属性的基本概念进行讲解。
绑定属性:Linux中采用“一对一”的线程机制,也就是一个用户线程对应一个内核线程。绑定属性就是指一个用户线程固定地分配给一个内核线程,因为CPU时间片的调度是面向内核线程 (也就是轻量级进程)的,因此具有绑定属性的线程可以保证在需要的时候总有一个内核线程与之对应。而与之相对的非绑定属性就是指用户线程和内核线程的关系不是始终固定的,而是由系统来控制分配的。
分离属性:分离属性是用来决定一个线程以什么样的方式来终止自己。在非分离情况下,当一个线程结束时,它所占用的系统资源并没有被释放,也就是没有真正的终止。只有当pthread_join()函数返回时,创建的线程才能释放自己占用的系统资源。而在分离属性情况下,一个线程结束时立即释放它所占有的系统资源。
这里要注意的一点是,如果设置一个线程的分离属性,而这个线程运行又非常快,那么它很可能在pthread_create 函数返回之前就终止了,它终止以后就可能将线程号和系统资源移交给其他的线程使用,这时调用pthread_create 的线程就得到了错误的线程号。
设置绑定属性:
int pthread_attr_init(pthread_attr_t *attr)
int pthread_attr_setscope(pthread_attr_t *attr, int scope)
int pthread_attr_getscope(pthread_attr_t *tattr, int *scope)
scope:PTHREAD_SCOPE_SYSTEM:绑定,此线程与系统中所有的线程竞争 PTHREAD_SCOPE_PROCESS:非绑定,此线程与进程中的其他线程竞争
设置分离属性:
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
int pthread_attr_getdetachstate(const pthread_attr_t *tattr,int *detachstate)
detachstate PTHREAD_CREATE_DETACHED:分离 PTHREAD _CREATE_JOINABLE:非分离
设置调度策略:
int pthread_attr_setschedpolicy(pthread_attr_t * tattr, int policy)
int pthread_attr_getschedpolicy(pthread_attr_t * tattr, int *policy)
policy SCHED_FIFO:先入先出 SCHED_RR:循环 SCHED_OTHER:实现定义的方法
设置优先级:
int pthread_attr_setschedparam (pthread_attr_t *attr, struct sched_param *param)
int pthread_attr_getschedparam (pthread_attr_t *attr, struct sched_param *param)
3.线程访问控制
1)互斥锁(mutex)
通过锁机制实现线程间的同步。同一时刻只允许一个线程执行一个关键部分的代码。
1 int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutex_attr_t *mutexattr);
2 int pthread_mutex_lock(pthread_mutex_t *mutex);
3 int pthread_mutex_unlock(pthread_mutex_t *mutex);
4 int pthread_mutex_destroy(pthread_mutex_t *mutex);
(1)先初始化锁init()或静态赋值pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIER
(2)加锁,lock,trylock,lock阻塞等待锁,trylock立即返回EBUSY
(3)解锁,unlock需满足是加锁状态,且由加锁线程解锁
(4)清除锁,destroy(此时锁必需unlock,否则返回EBUSY)
mutex 分为递归(recursive) 和非递归(non-recursive)两种,这是POSIX 的叫法,另外的名字是可重入(Reentrant) 与非可重入。这两种mutex 作为线程间(inter-thread) 的同步工具时没有区别,它们的惟一区别在于:同一个线程可以重复对recursive mutex 加锁,但是不能重复对non-recursive mutex 加锁。
首选非递归mutex,绝对不是为了性能,而是为了体现设计意图。non-recursive 和recursive 的性能差别其实不大,因为少用一个计数器,前者略快一点点而已。在同一个线程里多次对non-recursive mutex 加锁会立刻导致死锁,我认为这是它的优点,能帮助我们思考代码对锁的期求,并且及早(在编码阶段)发现问题。毫无疑问recursive mutex 使用起来要方便一些,因为不用考虑一个线程会自己把自己给锁死了,我猜这也是Java 和Windows 默认提供recursive mutex 的原因。(Java 语言自带的intrinsic lock 是可重入的,它的concurrent 库里提供ReentrantLock,Windows的CRITICAL_SECTION 也是可重入的。似乎它们都不提供轻量级的non-recursive mutex。)
2)条件变量(cond)
利用线程间共享的全局变量进行同步的一种机制。
1 int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);
2 int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
3 int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_t *mutex,const timespec *abstime);
4 int pthread_cond_destroy(pthread_cond_t *cond);
5 int pthread_cond_signal(pthread_cond_t *cond);
6 int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有线程的阻塞
(1)初始化. init()或者pthread_cond_t cond=PTHREAD_COND_INITIALIER;属性置为NULL
(2)等待条件成立. pthread_cond_wait,pthread_cond_timedwait.
wait()释放锁,并阻塞等待条件变量为真
timedwait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)
(3)激活条件变量:pthread_cond_signal,pthread_cond_broadcast(激活所有等待线程)
(4)清除条件变量:destroy; 无线程等待,否则返回EBUSY
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
这两个函数一定要在mutex的锁定区域内使用。
调用 pthread_cond_signal() 释放被条件阻塞的线程时,如果没有任何线程基于条件变量阻塞,则调用pthread_cond_signal()不起作用。而对于 Windows,当调用 SetEvent 触发 Auto-reset 的 Event 条件时,如果没有被条件阻塞的线程,那么此函数仍然起作用,条件变量会处在触发状态。
Linux下生产者消费者问题(使用互斥锁和条件变量):
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "pthread.h"
#define BUFFER_SIZE 16
struct prodcons
{
int buffer[BUFFER_SIZE];
pthread_mutex_t lock; //mutex ensuring exclusive access to buffer
int readpos,writepos; //position for reading and writing
pthread_cond_t notempty; //signal when buffer is not empty
pthread_cond_t notfull; //signal when buffer is not full
};
//initialize a buffer
void init(struct prodcons* b)
{
pthread_mutex_init(&b->lock,NULL);
pthread_cond_init(&b->notempty,NULL);
pthread_cond_init(&b->notfull,NULL);
b->readpos = 0;
b->writepos = 0;
}
//store an integer in the buffer
void put(struct prodcons* b, int data)
{
pthread_mutex_lock(&b->lock);
//wait until buffer is not full
while((b->writepos+1)%BUFFER_SIZE == b->readpos)
{
printf("wait for not full\n");
pthread_cond_wait(&b->notfull,&b->lock);
}
b->buffer[b->writepos] = data;
b->writepos++;
b->writepos %= BUFFER_SIZE;
pthread_cond_signal(&b->notempty); //signal buffer is not empty
pthread_mutex_unlock(&b->lock);
}
//read and remove an integer from the buffer
int get(struct prodcons* b)
{
int data;
pthread_mutex_lock(&b->lock);
//wait until buffer is not empty
while(b->writepos == b->readpos)
{
printf("wait for not empty\n");
pthread_cond_wait(&b->notempty,&b->lock);
}
data=b->buffer[b->readpos];
b->readpos++;
b->readpos %= BUFFER_SIZE;
pthread_cond_signal(&b->notfull); //signal buffer is not full
pthread_mutex_unlock(&b->lock);
return data;
}
#define OVER -1
struct prodcons buffer;
void * producer(void * data)
{
int n;
for(n=0; n<50; ++n)
{
printf("put-->%d\n",n);
put(&buffer,n);
}
put(&buffer,OVER);
printf("producer stopped\n");
return NULL;
}
void * consumer(void * data)
{
int n;
while(1)
{
int d = get(&buffer);
if(d == OVER) break;
printf("get-->%d\n",d);
}
printf("consumer stopped\n");
return NULL;
}
int main()
{
pthread_t tha,thb;
void * retval;
init(&buffer);
pthread_creare(&tha,NULL,producer,0);
pthread_creare(&thb,NULL,consumer,0);
pthread_join(tha,&retval);
pthread_join(thb,&retval);
return 0;
}
3)信号量
如同进程一样,线程也可以通过信号量来实现通信,虽然是轻量级的。
信号量函数的名字都以"sem_"打头。线程使用的基本信号量函数有四个。
#include <semaphore.h>
int sem_init(sem_t *sem , int pshared, unsigned int value);
这是对由sem指定的信号量进行初始化,设置好它的共享选项(linux只支持为0,即表示它是当前进程的局部信号量),然后给它一个初始值VALUE。
两个原子操作函数:这两个函数都要用一个由sem_init调用初始化的信号量对象的指针做参数。
int sem_wait(sem_t *sem); //给信号量减1,对一个值为0的信号量调用sem_wait,这个函数将会等待直到有其它线程使它不再是0为止。
int sem_post(sem_t *sem); //给信号量的值加1
int sem_destroy(sem_t *sem);
这个函数的作用是再我们用完信号量后都它进行清理。归还自己占有的一切资源。
用信号量实现生产者消费者:
这里使用4个信号量,其中两个信号量occupied和empty分别用于解决生产者和消费者线程之间的同步问题,pmut用于多个生产者之间互斥问题,cmut是用于多个消费者之间互斥问题。其中empty初始化为N(有界缓区的空间元数),occupied初始化为0,pmut和cmut初始化为1。
参考代码:
#define BSIZE 64
typedef struct
{
char buf[BSIZE];
sem_t occupied;
sem_t empty;
int nextin;
int nextout;
sem_t pmut;
sem_t cmut;
}buffer_t;
buffer_t buffer;
void init(buffer_t * b)
{
sem_init(&b->occupied, 0, 0);
sem_init(&b->empty,0, BSIZE);
sem_init(&b->pmut, 0, 1);
sem_init(&b->cmut, 0, 1);
b->nextin = b->nextout = 0;
}
void producer(buffer_t *b, char item)
{
sem_wait(&b->empty);
sem_wait(&b->pmut);
b->buf[b->nextin] = item;
b->nextin++;
b->nextin %= BSIZE;
sem_post(&b->pmut);
sem_post(&b->occupied);
}
char consumer(buffer_t *b)
{
char item;
sem_wait(&b->occupied);
sem_wait(&b->cmut);
item = b->buf[b->nextout];
b->nextout++;
b->nextout %= BSIZE;
sem_post(&b->cmut);
sem_post(&b->empty);
return item;
}