Go高级特性探究之协程池详解
作者:tracy小猫
在并发编程中,协程是 Go 语言的核心特性之一,但是在实际应用中,协程的创建和销毁成本比较高。当需要同时处理大量的任务时,创建大量的协程会导致系统开销变大,进而影响程序的性能。这时候,就需要使用协程池来管理协程的生命周期,将协程的创建和销毁成本降至最小,提高程序的并发性能。
本文将介绍如何使用 Go 协程池构造一个协程池,并解决函数传参问题、优雅关闭协程池和保证协程安全的问题。
Pool
type Pool struct { capacity uint64 // 最大协程数 runningWorkers uint64 // 当前正在运行的协程数 status int64 // 协程池的状态 chTask chan *Task // 执行任务的 channel PanicHandler func(interface{}) // 处理协程中的 panic 异常 sync.Once sync.Mutex }
Pool 类型是协程池的主要类型,包含了以下属性:
capacity
:最大协程数。runningWorkers
:当前正在运行的协程数。status
:协程池的状态。chTask
:执行任务的 channel。PanicHandler
:处理协程中的 panic 异常。sync.Once
:防止 Stop 函数被多次调用。sync.Mutex
: 用于锁定协程池的状态和 channel。
同时 Pool 类型包含以下函数:
NewPool
:用于初始化协程池。Submit
:将任务放到 channel 中供协程进行任务处理。createWorker
:用于创建并启动一个协程来执行任务。incRunning
:增加协程池的运行协程数。decRunning
:减少协程池的运行协程数。Stop
:关闭协程池,停止接受任务并且等待所有任务执行完毕后关闭协程池。
NewPool 函数
NewPool 函数用于创建和初始化一个协程池。将最大协程数 n 和处理协程中 panic 异常的函数 panicHandler 传入函数中,创建一个 Pool 类型,并将属性初始化后返回一个 Pool 的指针类型。
func NewPool(n uint64, panicHandler func(interface{})) *Pool { return &Pool{ capacity: n, status: Running, chTask: make(chan *Task, n), PanicHandler: panicHandler, } }
Submit 函数
Submit 函数用于将任务放到 channel 中供协程进行任务处理。首先判断协程池状态是否为 Stopped,如果已经关闭,则返回一个错误;接着加锁,并判断 channel 中是否已满,如果已经满了,则返回一个错误,否则将任务放到 channel 中并返回 nil。
// 将任务放到 channel 中供协程进行任务处理 func (p *Pool) Submit(t *Task) error { if p.status == Stopped { return errors.New("协程池已关闭,不能提交任务") } p.Lock() defer p.Unlock() if len(p.chTask) == int(p.capacity) { return errors.New("协程池已满,不能接受新任务") } p.chTask <- t return nil }
createWorker 函数
createWorker 函数用于创建并启动一个协程来执行任务。首先增加当前运行的协程数,然后在一个 go 协程内执行任务。如果在执行任务的过程中出现 panic 异常,则调用 PanicHandler
处理函数,如果没有设置 PanicHandler
处理函数,则直接将异常信息打印出来。执行完任务后,减少当前运行的协程数。
// 初始化协程池的协程数量 func (p *Pool) createWorker() { p.incRunning() // 每一个协程获取一个任务,执行任务 go func() { defer func() { if r := recover(); r != nil { if p.PanicHandler != nil { p.PanicHandler(r) } else { fmt.Println("Panic:", r) } } p.decRunning() }() for { select { case t := <-p.chTask: if t == nil { return } t.Handler(t.Params...) } } }() }
incRunning、decRunning 函数
incRunning、decRunning 函数用于增加和减少协程池的运行协程数,使用了 atomic.AddUint64 函数来保证操作的原子性。
// 增加协程池的运行协程数 func (p *Pool) incRunning() { atomic.AddUint64(&p.runningWorkers, 1) } // 减少协程池的运行协程数 func (p *Pool) decRunning() { atomic.AddUint64(&p.runningWorkers, ^uint64(0)) }
Stop 函数
Stop 函数用于关闭协程池,停止接受任务并且等待所有任务执行完毕后关闭协程池。首先判断协程池状态是否为 Running,如果已经关闭,则直接返回;接着将协程池状态设置为 Stopped,然后使用 sync.Once 确保关闭 channel 的操作仅被执行一次,同时创建运行的协程数个协程,等待它们执行完毕后关闭协程池。
// 关闭协程池 func (p *Pool) Stop() { if p.status == Running { p.status = Stopped p.Once.Do(func() { close(p.chTask) for i := uint64(0); i < p.runningWorkers; i++ { p.createWorker() } }) } }
解决函数传参问题
在使用协程池时,需要向协程池提交任务,但是协程池内部的协程如何知道要执行什么样的任务,参数又应该如何传递呢?
为了解决这个问题,可以定义一个 Task 结构体,用于存储要执行的函数和函数参数,如下所示:
type Task struct { Handler func(v ...interface{}) Params []interface{} }
Task 类型是一个结构体,用于封装协程池的任务。其中 Handler 是一个函数类型,用于任务执行的函数;Params 是一个可变参数,调用 Handler 时传递给它的参数。
其中,Handler 是一个无返回值的函数,且该函数可接受变长参数,Params 是一个任意类型的切片,用于传递函数的参数列表。
在向协程池提交任务时,可以将 Task 对象作为参数进行提交。
pool.Submit(&Task{ Handler: func(v ...interface{}) { // 执行任务的代码 }, Params: []interface{}{...}, // 任务的参数列表 })
在协程内部,可以通过调用 Task.Handler 方法,并将 Task.Params 作为参数传递进去,来运行具体的任务。
select { case t := <-p.chTask: if t == nil { return } t.Handler(t.Params...) }
通过这种方式,协程池就能够动态地执行不同的任务,并且传递任意类型和数量的参数。
优雅关闭协程池
在使用协程池时,如何正确地关闭协程池,以避免因未正确关闭而导致的内存泄漏和程序崩溃呢?
首先,需要明确协程池的运行状态,通过内部的 status
参数控制协程池的开关。当协程池处于运行状态时,协程池才能够接受新的任务,否则应该拒绝新的任务请求,并尽快释放内部的资源。
其次,在关闭协程池时,需要确保所有的已运行的协程都已经执行完任务并退出。这时,可以使用 sync.Once 来执行一次协程池的清理工作。当协程池处于关闭状态时,不再接受新的任务,并通知所有的协程退出任务循环,最终实现协程池的优雅关闭。
func (p *Pool) Stop() { if p.status == Running { p.status = Stopped p.Once.Do(func() { close(p.chTask) for i := uint64(0); i < p.runningWorkers; i++ { p.createWorker() } }) } }
保证协程安全
在使用协程池时,需要注意线程安全问题,尤其是在多个协程同时访问协程池时,需要保证协程池的内部状态是线程安全的。
同时对于状态的变更以及数量的增减,还需要保证代码的安全性。
为了保证线程安全,可以使用互斥锁 sync.Mutex 来锁定协程池,以避免多个协程同时读写协程池的运行状态和其他内部参数。
在协程池的内部实现中,使用的 sync.Once
只会单次执行的特性可以保证协程池只会初始化一次,防止因多次初始化而导致的内存泄漏或其他异常。
测试用例
为了测试协程池的正确性,以下是一个简单的测试用例。该测试用例创建一个容量为 3 的协程池,并向其中提交 10 个任务,每个任务随机睡眠一段时间,并输出当前时间。
package main import ( "fmt" "math/rand" "sync" "testing" "time" ) func TestPool(t *testing.T) { pool := NewPool(3, func(err interface{}) { fmt.Println("发生 panic,错误信息:", err) }) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() task := \&Task{ Handler: func(v ...interface{}) { fmt.Printf("任务 %d 开始执行,时间:%v\n", id, time.Now().Format("2006-01-02 15:04:05")) rand.Seed(time.Now().UnixNano()) time.Sleep(time.Duration(rand.Intn(5)) \* time.Second) fmt.Printf("任务 %d 执行完毕,时间:%v\n", id, time.Now().Format("2006-01-02 15:04:05")) }, Params: \[]interface{}{}, } pool.Submit(task) }(i) } wg.Wait() }
输出结果如下:
任务 0 开始执行,时间:2021-10-05 16:52:22
任务 1 开始执行,时间:2021-10-05 16:52:22
任务 2 开始执行,时间:2021-10-05 16:52:22
任务 0 执行完毕,时间:2021-10-05 16:52:27
任务 3 开始执行,时间:2021-10-05 16:52:27
任务 4 开始执行,时间:2021-10-05 16:52:27
任务 1 执行完毕,时间:2021-10-05 16:52:28
任务 5 开始执行,时间:2021-10-05 16:52:28
任务 6 开始执行,时间:2021-10-05 16:52:28
任务 7 开始执行,时间:2021-10-05 16:52:28
任务 4 执行完毕,时间:2021-10-05 16:52:29
任务 8 开始执行,时间:2021-10-05 16:52:29
任务 9 开始执行,时间:2021-10-05 16:52:29
任务 2 执行完毕,时间:2021-10-05 16:52:32
任务 5 执行完毕,时间:2021-10-05 16:52:33
任务 7 执行完毕,时间:2021-10-05 16:52:33
任务 6 执行完毕,时间:2021-10-05 16:52:34
任务 3 执行完毕,时间:2021-10-05 16:52:35
任务 9 执行完毕,时间:2021-10-05 16:52:35
任务 8 执行完毕,时间:2021-10-05 16:52:37
从输出结果可以看出,协程池成功并行处理了所有的任务,并且在容量限制的情况下,成功地保证了协程池的线程安全性。
改进
可考虑增加对协程池容量的动态调整算法,例如在高峰期时增加协程池的容量,低谷期时降低协程池的容量。另外可以增加协程池的超时控制机制,以避免任务执行时间过长导致系统资源浪费和性能下降。
总结
协程池是 Go 语言中一种重要的并发编程模式,通过协程池可以高效地管理协程的生命周期、避免协程的频繁创建和销毁,提高程序的并发性能。在使用协程池时,需要注意解决函数传参问题、优雅关闭协程池和保证协程安全的问题,通过合理使用互斥锁和 sync.Once 可以有效解决这些问题,从而保证协程池的正确性和高效性。
以上就是Go高级特性探究之协程池详解的详细内容,更多关于Go协程池的资料请关注脚本之家其它相关文章!