详解Go语言如何实现一个最简化的协程池
作者:yesAnd92
背景
学习完优秀的协程池开源项目ants之后,想要对协程池做了一个总结,在ants的基础上做了简化,保留了一个协程池的核心功能,旨在帮助协程池的理解和使用。
为什么要用协程池
- 降低资源开销:协程池可以重复使用协程,减少创建、销毁协程的开销,从而提高系统性能。
- 提高响应速度:接收任务后可以立即执行,无需等待创建协程时间。
- 增强可管理型:可以对协程进行集中调度,统一管理,方便调优。
实现协程池都需要实现哪些功能
思考一下,如果如果让你实现一个协程池,有哪些必要的核心功能呢?
- 协程池需要提供一个对外接收任务提交的接口
- 如何创建一个协程,创建好的协程存放在哪里?
- 协程池中的协程在没有task时,是如何存在的?
- 如何为使用者提交的task分配一个协程?
- 协程执行完task后,如何返还到协程池中?
协程池内部需要维护一个装有一个个协程的队列,用于存放管理的协程,为了拓展功能方便,我们把每个协程都封装一个worker,这个worker队列需要具备几个核心功能:
协程池整体的架构
带着这些如何实现一个简单协程池必要核心功能的问题,我们来看下,一个协程池的核心流程,用图来表示就是:
从图上可以看出协程池主要包括3个组件:
协程池(gorutine-pool) :它是整个协程池的入口和主体,内部持有一个协程队列,用于存放、调度worker。
协程队列(worker-queue) :持有协程池维护的所有的协程,为了拓展方便,将协程封装成worker,一个worker对应一个协程。
worker:每个worker对应一个协程,它能够运行一个任务,通常是个函数,是真正干活的地方。
主要流程:
当一个使用者一个task提交后,协程池从workerQueue中获取一个可用的worker负责执行此task,如果worker队列中没有可用的worker,并且worker的数量还没有达到队列设置最大数量,可以新建一个worker补充到队列中,worker执行完任务后,还需要能够将自己返还到workder队列中,才能达到复用的目的
三个组件的实现
分别来看下三个组件是如何实现协程池的
gorutine-pool实现
// Pool pool负责worker的调度 type Pool struct { //pool最大最大线程数量 cap int32 //当前运行的worker数量 running int32 //worker队列 workers workerQueue //控制并发访问临界资源 lock sync.Mutex }
pool结构体中cap
和running
两个属性用来管理协程池的数量,workers
存放创建的协程,lock
控制并发访问临界资源。
从上述的架构图中可以看出,pool需要对外提供接收task的方法,以及两个内部从workerQueue获取worker、返还worker到workerQueue的方法。
Submit
// Submit 提交任务 func (p *Pool) Submit(f func()) error { if worker := p.retrieveWorker(); worker != nil { worker.inputFunc(f) return nil } return ErrPoolOverload }
Submit()是给调用者提交task任务的方法,它的入参是一个函数,这个函数就是协程池使用者想让协程池执行的内容。协程池pool会尝试为这个task分配一个worker来处理task,但是,如果协程池的worker都被占用,并且有数量限制无法再创建新的worker,pool也无能为力,这里会返回给调用者一个"过载"的异常,当然这里可以拓展其它的拒绝策略。
retrieveWorker()
//从workerQueue中获取一个worker func (p *Pool) retrieveWorker() worker { p.lock.Lock() w := p.workers.detach() if w != nil { p.lock.Unlock() } else { //没有拿到可用的worker //如果容量还没耗尽,再创建一个worker if p.running < p.cap { w = &goWorker{ pool: p, task: make(chan func()), } w.run() } p.lock.Unlock() } return w }
retrieveWorker()
是从pool中的workerQueue中获取一个worker具体实现。获取worker是一个并发操作,这里使用锁控制并发。调用workers.detach()
从workerQueue中获取worker,如果没有拿到可用的worker,这时候还需要看看目前pool中现存活的worker数量是否已经达到上限,未达上限,则可以创建新的worker加入到pool中。
revertWorker()
// 执行完任务的worker返还到workerQueue func (p *Pool) revertWorker(w *goWorker) bool { defer func() { p.lock.Unlock() }() p.lock.Lock() //判断容量,如果协程存活数量大于容量,销毁 if p.running < p.cap { p.workers.insert(w) return true } return false }
返还当前worker到pool是在worker执行完task之后,返回时需要判断当前存活的worker数量是否到达pool的上限,已达上限则返回失败。另外,由于running
属性存在并发访问的问题,返还操作也需要加锁。
workerQueue实现
为了提高拓展性,我们将workerQueue抽象成接口,也就是说可以有多种协程队列实现来适配更多的使用场景
workerQueue接口
// 定义一个协程队列的接口 type workerQueue interface { //队列长度 len() int //插入worker insert(w worker) //分派一个worker detach() worker }
插入insert()
和分配detach()
是实现协程队列的核心方法。这里我们以底层基于“栈”思想的结构作为workerQueue的默认实现,也即后进入队列的协程优先被分配使用。
// 底层构造一个栈类型的队列来管理多个worker type workerStack struct { items []worker }
我们使用数组来存放worker,用数组来模拟先进后出的协程队列
// 新创建一个worker func (ws *workerStack) insert(w worker) { ws.items = append(ws.items, w) }
workerStack的insert()
实现很简单,直接在数组尾巴追加一个worker
// 分配一个worker func (ws *workerStack) detach() worker { l := ws.len() if l == 0 { return nil } w := ws.items[l-1] ws.items[l-1] = nil ws.items = ws.items[:l-1] return w }
detach()
负责从数组中获取一个可用的空闲worker,每次获取时取用的是数组的最后一个元素,也就是协程队列末尾的worker优先被分配出去了。
注意这里将下标l-1
位置的对象置为nil,可以防止内存泄露
worker实现
type worker interface { workId() string run() //接收函数执行任务 inputFunc(func()) } type goWorker struct { workerId string //需要持有自己所属的 Pool 因为要和它进行交互 pool *Pool task chan func() }
这里同样了为了拓展,将worker抽象成了一个接口,goWorker是它的一个默认实现,worker最核心的工作就是等待着task到来,接到task后执行,task具体来说就是一个函数。这里其实是一个很简单的生产者/消费者模型,我们想到使用管道来实现生产消费模型,定义一个函数类型的管道,你或许要问为什么使用管道,还有别的方式可以实现这个功能么?不急,我们来看看worker要实现什么功能:
- 创建出来的worker,未必马上就有task分配过来执行,它肯定要把自己“阻塞”住,随时等待task到来
- task传递过来终归需要一个介质
鉴于这个场景,使用管道式非常合适的,管道内没有元素时,worker阻塞等待,当管道内有task进来时,worker被唤醒,从管道中取出task进行处理。当然,我们使用一个死循环,不断自旋的从一个容器中读取task,也能达到同样的目的,但却没有使用管道合适、优雅!
func (g *goWorker) run() { go func() { defer func() { atomic.AddInt32(&g.pool.running, -1) }() //running+1 atomic.AddInt32(&g.pool.running, 1) for { select { case f := <-g.task: if f == nil { return } //执行提交的任务 f() } //worker返还到queue中 if ok := g.pool.revertWorker(g); !ok { return } } }() } func (g *goWorker) inputFunc(f func()) { g.task <- f }
run()
是worker的核心方法,worker通常被创建后就会调用run()
,一起来看下主要做了那些内容:
- 使用select 监听task管道,阻塞当前协程,回到管道内有task进入
- 监听到task,则会执行task的内容
- task执行完毕后,会调用
poo.revertWorker()
将当前worker返还到协程池中待用,当然这里未必会返成功。
很容易忽略的一个点是:为什么要新启动一个协程来完成以上工作?因为worker中没有task时,要阻塞等待任务,如果不是在一个新的协程中,整个程序都阻塞在第一个worker的run()
中,所谓协程池,就是指每个worker对应的这个协程。
另外,pool维护一个running
属性来表示存活的worker数量,当调用run()
方法后,表示worker是可用的了,running
值+1。如果worker返回协程池失败,run()
执行完毕,worker对应的协程被系统销毁,表示当前worker生命周期结束了,对应的写成会将running
值-1。由于多个worker并发修改running
值,使用了atomic.AddInt32
控制临界资源的修改。
至此,实现一个简单协程池的核心的功能都已经完成,和ants相比,这是一个相当精简的协程池,旨在帮助我们加深对协程池、线程池这类组件模型的理解,离真正可用有段距离。ants中更完备的功能,比如:ants实现了定期清理空闲worker,以及对锁的优化、worker的池化等等,感兴趣的可以看看ants,短小精悍的开源项目!
到此这篇关于详解Go语言如何实现一个最简化的协程池的文章就介绍到这了,更多相关Go协程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!