Go底层之singleflight包原理分析
作者:在成都搬砖的鸭鸭
这篇文章主要介绍了Go底层之singleflight包原理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
背景
在处理同一时刻接口的并发请求时,常见的有这几种情况:
一个请求正在执行,相同的其它请求等待顺序执行,使用互斥锁就能完成、一个请求正在执行,相同的其它请求都丢弃、一个请求正在执行,相同的其它请求等待拿取相同的结果。
使用singleflight包就能达到一个请求正在执行,相同的其它请求过来等待第一个请求执行完,然后共享第一个请求的结果,在处理并发场景时非常好用。
下载
go get -u golang.org/x/sync/singleflight
原理解释
- singleflight底层结构:
type Group struct { mu sync.Mutex //保护map对象m的并发安全 m map[string]*call //key-请求的唯一标识,val-请求唯一标识对应要执行的函数 }
- call底层结构:
type call struct { wg sync.WaitGroup //用来阻塞相同标识对应的请求中的第一个请求之外的请求 val interface{} //第一个请求的执行结果 err error //第一个请求返回的错误 dups int //第一个请求之外的其它请求数 chans []chan<- Result //请求结果写入通道 }
- 关键函数:
// // Do // @Description: 一个唯一标识对应的请求在执行过程中,相同唯一标识对应的请求会被阻塞,等待第一个请求执行完并共享结果 // @receiver g // @param key 请求唯一标识 // @param fn 请求要执行的函数 // @return v 请求要执行函数返回的结果 // @return err 请求要执行的函数返回的错误 // @return shared 是否有多个请求共享结果 // func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() //保护map对象m的并发安全 if g.m == nil { g.m = make(map[string]*call) //初始化m对象 } if c, ok := g.m[key]; ok { //map中key存在说明这个key对应的请求正在执行中,这次请求不是第一个请求 c.dups++ //等待共享结果数+1 g.mu.Unlock() c.wg.Wait() //阻塞等待第一个请求执行完 if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true //返回第一个请求的执行结果 } //第一个请求的处理逻辑 c := new(call) c.wg.Add(1) //计数+1 g.m[key] = c //唯一标识关联对应的函数对象 g.mu.Unlock() g.doCall(c, key, fn) //执行请求对应的函数 return c.val, c.err, c.dups > 0 //返回执行结果 }
上面Do函数中g.doCall函数也需要大概理解一下,就是会将key对应函数的执行结果写的call对象c里,然后清空wg计数,相同key对应的其它请求就会跳出c.wg.Wait()阻塞,直接从call对象c中读取第一个请求的执行结果和错误信息并返回,源码如下:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false defer func() { //fn函数执行完之后执行 if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() defer g.mu.Unlock() c.wg.Done() //释放计数 if g.m[key] == c { delete(g.m, key) //删除此次请求的唯一标识相关信息 } if e, ok := c.err.(*panicError); ok { if len(c.chans) > 0 { go panic(e) select {} } else { panic(e) } } else if c.err == errGoexit { // Already in the process of goexit, no need to call again } else { for _, ch := range c.chans { //执行结果写入通道 ch <- Result{c.val, c.err, c.dups > 0} } } }() func() { defer func() { if !normalReturn { if r := recover(); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() //执行fn函数 normalReturn = true }() if !normalReturn { recovered = true } }
singleflight中还提供了与Do函数功能相同的函数DoChan函数,唯一区别就是将请求对应的函数执行结果放到通道中进行返回,这两函数一个用于同步场景,一个用于异步场景。还有一个Forget函数:
func (g *Group) Forget(key string) { g.mu.Lock() delete(g.m, key) //删除map中的key,相同key对应请求进来会重新执行,不等待第一个key对应请求的执行结果 g.mu.Unlock() }
代码示例
- 示例如下:
func main() { var singleFlight singleflight.Group //初始化一个单次执行对象 var count uint64 //用于测试是否被修改 //为了并发执行,这里测试唯一标识都为xxx go func() { val1, _, shared1 := singleFlight.Do("xxx", func() (interface{}, error) { logger.Info("first count +1") atomic.AddUint64(&count, 1) //第一次执行,将count+1 time.Sleep(5 * time.Second) //增加第一次执行时间 return count, nil }) //打印第一次执行结果 logger.Info("first count info", zap.Any("val1", val1), zap.Bool("shared1", shared1), zap.Uint64("count", count)) }() time.Sleep(2 * time.Second) //为了防止下面的Do函数先执行 val2, _, shared2 := singleFlight.Do("xxx", func() (interface{}, error) { logger.Info("second count +1") atomic.AddUint64(&count, 1) //第2次执行count+1 return count, nil }) //打印第二次执行结果 logger.Info("second count info", zap.Any("val2", val2), zap.Bool("shared2", shared2), zap.Uint64("count", count)) }
- 控制台输出:
$ go run ./singlefight_demo/main.go [2025-01-09 17:08:11.169] | INFO | Goroutine:6 | [singlefight_demo/main.go:19] | first count +1 [2025-01-09 17:08:16.261] | INFO | Goroutine:6 | [singlefight_demo/main.go:28] | first count info | {"val1": 1, "shared1": true, "count": 1} [2025-01-09 17:08:16.261] | INFO | Goroutine:1 | [singlefight_demo/main.go:41] | second count info | {"val2": 1, "shared2": true, "count": 1}
总结
看singleflight原码之后,要实现一个请求正在执行,相同的其它请求进来时直接报错的功能也很简单,将singleflight中等待第一个请求的逻辑改为直接返回错误就可以。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。