GoLang协程库libtask学习笔记
作者:Onemorelight95
协程解决了什么问题
我们先从一次网络IO请求过程中的read操作为例,请求数据会先拷贝到系统内核空间中,再从操作系统的内核空间拷贝到应用程序的用户空间中。从内核空间将数据拷贝到用户空间过程中,会经历两个阶段:
- 等待数据准备
- 拷贝数据
因为有这两个阶段,所以就有了各种网络IO的模型:
同步编程:应用程序等待IO结果(比如等待打开一个大的文件,或者等待远端服务器的响应),阻塞当前线程。
- 优点:逻辑简单。
- 缺点:效率太低,其他与IO无关的业务也要等待IO的响应。
异步多线程/进程:将IO操作频繁的逻辑、或者单纯的IO操作独立到一/多个线程中,业务线程与IO线程间靠通信/全局变量来共享数据。
- 优点:充分利用CPU资源,防止阻塞资源。
- 缺点:线程切换代价相对较高,异步逻辑代码复杂。
异步消息+回调函数:设计一个消息循环处理器,接收外部消息(包括系统通知和网络报文等),收到消息时调用注册的回调函数。
- 优点:充分利用CPU资源,防止阻塞资源。
- 缺点:代码逻辑复杂。
而协程,就是用同步的语义去解决异步问题,即业务逻辑看起来是同步的,但实际上并不阻塞当前线程(一般是靠事件循环处理来分发消息)。所以协程实际上是在单线程的环境下实现的应用程序级别的并发,就是把本来由操作系统控制的切换+保存状态在应用程序里面实现了。
由于协程在应用程序级别来处理任务,所以协程更像是一个函数,只是比普通的函数多了两个动作:yield()
和resume()
,即让出和恢复。让出的时候我们需要将寄存器的协程上下文保存起来,恢复的时候再将上下文重新压入寄存器,继续执行。
简介
Libtask 是一个简单的协程库,它展示了最简单的一种协程实现方式。操作系统只能看见一个内核线程,无法感知到客户端协程的存在。
Libtask中的协程是协作式的,也就是说,使用的不是时间片轮转算法,调度器根据先来先服务的策略来执行就绪队列中的协程,只有当每个协程主动退出时调度器才会把CPU分配给下一个协程。
对协程的抽象
在libtask中,协程被抽象成一个Task结构体,结构体中的字段用于描述协程的相关信息:
// 一个Task可以看成是一个需要异步执行的任务,coroutine的抽象描述 struct Task { char name[256]; char state[256]; // 前后指针 Task *next; Task *prev; Task *allnext; Task *allprev; // 执行上下文 Context context; // 睡眠时间 uvlong alarmtime; uint id; // 协程栈指针 uchar *stk; // 协程栈大小 uint stksize; // 协程是否退出了 int exiting; // 在在alltask的中的索引下标 int alltaskslot; // 是否是系统协程 int system; // 是否在就绪状态 int ready; // Task需要执行的函数 void (*startfn)(void*); // startfn的参数 void *startarg; // 自定义数据 void *udata; };
创建协程
int taskcreate(void (*fn)(void*), void *arg, uint stack) { int id; Task *t; // 分配task和stack的空间 t = taskalloc(fn, arg, stack); // 协程的数量+1 taskcount++; id = t->id; if(nalltask%64 == 0){ alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0])); if(alltask == nil){ fprint(2, "out of memory\n"); abort(); } } // 记录位置 t->alltaskslot = nalltask; // 保存到alltask中 alltask[nalltask++] = t; // 修改状态为就绪,可以被调度,并且加入到就绪队列 taskready(t); return id; }
我们可以使用taskcreate
函数来创建协程,在taskcreate
函数中,首先会调用taskalloc
函数为Task和执行栈分配内存,然后初始化协程的上下文信息,在此之后,一个协程就被创建成功了。
/* * taskalloc函数的主要逻辑是申请Task结构体所需的内存和执行时栈的内存, * 然后初始化各个字段。在此之后,一个协程就被创建成功了,接着执行taskready * 把协程加入就绪队列中。 * */ static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) { Task *t; sigset_t zero; uint x, y; ulong z; /* allocate the task and stack together */ // 结构体本身的大小和栈大小 // 协程栈大小是256*1024 t = malloc(sizeof *t+stack); if(t == nil){ fprint(2, "taskalloc malloc: %r\n"); abort(); } memset(t, 0, sizeof *t); // 栈的内存位置 t->stk = (uchar*)(t+1); // 栈大小 t->stksize = stack; // 协程id t->id = ++taskidgen; // 协程工作函数和参数 t->startfn = fn; t->startarg = arg; /* do a reasonable initialization */ memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); // 初始化uc_sigmask字段为空,即不阻塞信号 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); /* must initialize with current context */ // 初始化uc字段 if(getcontext(&t->context.uc) < 0){ fprint(2, "getcontext: %r\n"); abort(); } /* call makecontext to do the real work. */ /* leave a few words open on both ends */ // 设置协程执行时的栈位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; #if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE) /* sigh */ #warning "doing sun thing" /* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */ t->context.uc.uc_stack.ss_sp = (char*)t->context.uc.uc_stack.ss_sp +t->context.uc.uc_stack.ss_size; #endif /* * All this magic is because you have to pass makecontext a * function that takes some number of word-sized variables, * and on 64-bit machines pointers are bigger than words. */ //print("make %p\n", t); z = (ulong)t; y = z; z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */ x = z>>16; // 保存信息到uc字段 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x); return t; }
创建好一个协程之后,taskcreate
函数会调用taskready()
函数把协程的状态修改为就绪态,并加入到就绪队列中。
/* * 修改协程的状态并加入就绪队列 * */ void taskready(Task *t) { t->ready = 1; addtask(&taskrunqueue, t); }
如何保存上下文信息
我们可以发现,在调用taskalloc
函数初始化协程的时候,我们还会对协程的上下文进行初始化,以下代码的流程是将当前CPU寄存器的上下文信息保存到当前Task的上下文中,同时将当前Task的栈位置和大小保存进上下文中,最后将协程的工作函数保存到上下文信息中。
// 将上下文置为零值 memset(&t->context.uc, 0, sizeof t->context.uc); // 将信号集zero清空 sigemptyset(&zero); // 初始化uc_sigmask字段为空,即不阻塞信号 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); // 将当前上下文信息保存到t-context.uc结构体中 if(getcontext(&t->context.uc) < 0){ fprint(2, "getcontext: %r\n"); abort(); } // 设置协程执行时的栈位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; z = (ulong)t; y = z; z >>= 16; x = z>>16; // 设置协程的工作函数到上下文信息中 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
ucontext族函数
其实对协程上下文的初始化以及保存是通过linux下的ucontext族函数来实现的。
ucontext_t结构体
我们发现Task结构体中有一个Context字段,这个字段其实就是对ucontext_t
结构体的封装,ucontext_t
结构体用于保存当前的上下文信息,它的结构是这样的:
typedef struct ucontext { unsigned long int uc_flags; struct ucontext *uc_link;//后序上下文 __sigset_t uc_sigmask;// 信号屏蔽字掩码 stack_t uc_stack;// 上下文所使用的栈 // 保存的上下文的寄存器信息 // 比如pc、sp、bp // pc程序计数器:记录下一条指令的地址 // sp堆栈指针:指向函数调用栈栈顶的指针,所以新数据入栈将存入sp+1的地址 // bp基址指针:指向函数调用栈的首地址 mcontext_t uc_mcontext; long int uc_filler[5]; } ucontext_t; //其中mcontext_t 定义如下 typedef struct { gregset_t __ctx(gregs);//所装载寄存器 fpregset_t __ctx(fpregs);//寄存器的类型 } mcontext_t; //其中gregset_t 定义如下 typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息
getcontext()函数
函数原型:
int getcontext(ucontext_t* ucp)
getcontext()
函数的底层是通过汇编来实现的,其主要的功能是将当前运行到的寄存器信息保存到参数ucp中。
setcontext()函数
函数原型:
int setcontext(const ucontext_t *ucp)
setcontext()
函数的作用是将ucontext_t
结构体变量ucp中的上下文信息重新恢复到cpu中并执行。
makecontext()函数
函数原型:
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
makecontext()
函数的主要功能是设置协程的工作函数到上下文(ucontext_t)中,同时在用户设置的栈上保存一些信息,并且设置栈顶指针的值到上下文信息中。
argc是入口函数的参数个数,后面的...
是具体的入口函数参数,该参数必须是整形值。
swapcontext()函数
函数原型:
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
该函数可以将当前cpu中的上下文信息保存到oucp
结构体变量中,然后将ucp
结构体的上下文信息恢复到cpu中。
这里可以理解为调用了两个函数,第一次是调用了getcontext(oucp)
然后再调用setcontext(ucp)
。
协程的调度
在使用taskcreate
创建协程的时候,这个函数内部会调用taskready
函数修改新建协程的状态并加入就绪队列taskrunqueue
中,taskrunqueue
中的协程需要一个调度器来调度执行。
tasklib库中实现了一个协程调度中心的函数。调度中心会不断的从就绪队列中取出协程来执行,它的核心逻辑是这样的:
- 从就绪队列中拿出一个协程t,并把t移出就绪队列。
- 通过
contextswitch
函数将协程t的上下文信息切换到taskschedcontext
中执行。 - 将协程t切换回调度中心,如果t已经退出,修改数据结构,然后回收他的内存,然后继续调度其它的协程执行。这里的调度机制比较简单,是非抢占式的协作式调度,没有时间片的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用
taskyield()
函数让出cpu。
static void taskscheduler(void) { int i; Task *t; taskdebug("scheduler enter"); for(;;){ // 如果没有就绪态协程了,就退出 if(taskcount == 0) exit(taskexitval); // 从就绪队列中拿出一个协程 t = taskrunqueue.head; if(t == nil){ fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount); exit(1); } // 从就绪队列中删除这个协程 deltask(&taskrunqueue, t); // 将协程状态改为非就绪态 t->ready = 0; // 保存正在执行的协程 taskrunning = t; // 切换次数+1 tasknswitch++; taskdebug("run %d (%s)", t->id, t->name); // 切换到t执行,将当前cpu中的上下文信息保存到taskschedcontext中 // 然后将t->context中的上下文信息恢复到cpu中执行 contextswitch(&taskschedcontext, &t->context); // 执行结束 taskrunning = nil; // 刚才执行的协程t退出了 if(t->exiting){ // 如果不是系统协程,协程个数减一 if(!t->system) taskcount--; // 保存当前协程在alltask的索引 i = t->alltaskslot; // 将最后一个协程切换到当前协程的位置,因为当前协程要退出了 alltask[i] = alltask[--nalltask]; // 更新被置换协程的索引 alltask[i]->alltaskslot = i; // 释放堆内存 free(t); } } }
从上述调度器执行的代码中可以发现,我们使用contextswitch
函数实现了协程间的上下文切换,这个函数的内部调用了swapcontext(&from->uc, &to->uc)
函数,这个函数将当前cpu中的上下文信息保存到from结构体变量中,然后将to结构体的上下文信息恢复到cpu中执行。
执行结束之后,会重新将上下文切换回调度中心。
static void contextswitch(Context *from, Context *to) { if(swapcontext(&from->uc, &to->uc) < 0){ fprint(2, "swapcontext failed: %r\n"); assert(0); } }
其实我们还可以通过调用taskyield
函数来控制协程在没有执行完就主动让出,那么当前正在执行的task会被 插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。
/* * 协程主动让出CPU * 1.将主动让出的协程重新加入到就绪队列 * 2.将当前协程的状态标记为让出 * 3.执行协程切换的逻辑 * */ int taskyield(void) { int n; // 协程的让出次数 n = tasknswitch; // 将当前主动让出的协程放进等待队列 taskready(taskrunning); // 标记当前协程的状态为“让出” taskstate("yield"); // 切换协程 taskswitch(); // 等于0说明当前只有自己一个协程,调度的时候taskswitch加1,所以这里要减1 return tasknswitch - n - 1; }
我们可以发现,切换流程的时候实际上是调用了taskswitch()
函数,这个函数内部会调用contextswitch
函数来切换上下文。
/* * 切换协程 * */ void taskswitch(void) { needstack(0); // 将当前CPU中的上下文信息保存到taskrunning->context结构体中 // 然后将调度中心上下文恢复到CPU中执行 contextswitch(&taskrunning->context, &taskschedcontext); }
总结
所以整个调度流程是这样的:
每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。
当然我们也可以调用taskyield
函数主动让出CPU,他会将当前正在执行的task插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。
到此这篇关于GoLang协程库libtask学习笔记的文章就介绍到这了,更多相关GoLang libtask内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!