Go并发原语之SingleFlight请求合并方法实例
作者:fliyu
SingleFlight 的使用场景
在处理多个 goroutine 同时调用同一个函数的时候,如何只用一个 goroutine 去调用一次函数,并将返回结果给到所有 goroutine,这是可以使用 SingleFlight,可以减少并发调用的数量。
在高并发请求场景中,例如秒杀场景:多个用户在同一时间查询库存数,这时候对于所有的用户而言,同一时间查询结果都是一样的,如果后台都去查缓存或者数据库,那么性能压力很大。如果相同时间只有一个查询,那么性能将显著提升。
一句话总结:SingleFlight 主要作用是合并并发请求的场景,针对于相同的读请求。
SingleFlight 的基本使用
下面先看看这段代码,5个协程同时并发返回 getProductById ,看看输出结果如何:
func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() result := getProductById("商品A") fmt.Printf("%v\n", result) }() } wg.Wait() } func getProductById(name string) string { fmt.Println("getProductById doing...") time.Sleep(time.Millisecond * 10) // 模拟一下耗时 return name }
$ go run main.go getProductById doing... getProductById doing... getProductById doing... getProductById doing... getProductById doing... 商品A 商品A 商品A 商品A 商品A
可以看出 getProductById 方法被访问了五次,那么如何通过 SingleFlight 进行优化呢?
定义一个全局变量 SingleFlight,在访问 getProductById 方法时调用 Do 方法,即可实现同一时间只有一次方法,代码如下:
import ( "fmt" "golang.org/x/sync/singleflight" "sync" "time" ) var g singleflight.Group func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() resp, _, _ := g.Do("商品A", func() (interface{}, error) { result := getProductById("商品A") return result, nil }) fmt.Printf("%v\n", resp) }() } wg.Wait() } func getProductById(name string) string { fmt.Println("getProductById doing...") time.Sleep(time.Millisecond * 10) // 模拟一下耗时 return name }
$ go run main.go getProductById doing... 商品A 商品A 商品A 商品A 商品A
你可能会想 SingleFlight 和 sync.Once 的区别,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并请求中,针对于同一时间的并发场景。
SingleFlight 的实现原理
SingleFlight 的数据结构是 Group ,结构如下:
// call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup // These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. val interface{} err error // These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. dups int chans []chan<- Result } // Group represents a class of work and forms a namespace in // which units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized } // Result holds the results of Do, so they can be passed // on a channel. type Result struct { Val interface{} Err error Shared bool }
可以看出,SingleFlight 是使用互斥锁 Mutex 和 Map 来实现的。互斥锁 Mutex 提供并发时的读写保护,而 Map 用于保存同一个 key 正在处理的请求。
其提供了3个方法:
Do 方法的实现逻辑
// Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. // The return value shared indicates whether v was given to multiple callers. func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
SingleFlight 定义了一个辅助对象 call,用于代表正在执行 fn 函数的请求或者是否已经执行完请求。
- 如果存在相同的 key,其他请求将会等待这个 key 执行完成,并使用第一个 key 获取到的请求结果
- 如果不存在,创建一个 call ,并将其加入到 map 中,执行调用 fn 函数。
DoChan 方法的实现逻辑
而 DoChan 方法与 Do 方法类似:
// DoChan is like Do but returns a channel that will receive the // results when they are ready. // // The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch }
Forget 方法的实现逻辑
// Forget tells the singleflight to forget about a key. Future calls // to Do for this key will call the function rather than waiting for // an earlier call to complete. func (g *Group) Forget(key string) { g.mu.Lock() delete(g.m, key) g.mu.Unlock() }
将 key 从 map 中删除。
总结
使用 SingleFlight 时,通过将多个请求合并成一个,降低并发访问的压力,极大地提升了系统性能,针对于多并发读请求的场景,可以考虑是否满足 SingleFlight 的使用情况。
而对于并发写请求的场景,如果是多次写只需要一次的情况,那么也是满足的。例如:每个 http 请求都会携带 token,每次请求都需要把 token 存入缓存或者写入数据库,如果多次并发请求同时来,只需要写一次即可
以上就是Go并发原语之SingleFlight请求合并方法实例的详细内容,更多关于Go SingleFlight 请求合并的资料请关注脚本之家其它相关文章!