Go并发原语之SingleFlight请求合并方法实例
作者:fliyu
SingleFlight 的使用场景
在处理多个 goroutine 同时调用同一个函数的时候,如何只用一个 goroutine 去调用一次函数,并将返回结果给到所有 goroutine,这是可以使用 SingleFlight,可以减少并发调用的数量。
在高并发请求场景中,例如秒杀场景:多个用户在同一时间查询库存数,这时候对于所有的用户而言,同一时间查询结果都是一样的,如果后台都去查缓存或者数据库,那么性能压力很大。如果相同时间只有一个查询,那么性能将显著提升。

一句话总结:SingleFlight 主要作用是合并并发请求的场景,针对于相同的读请求。
SingleFlight 的基本使用
下面先看看这段代码,5个协程同时并发返回 getProductById ,看看输出结果如何:
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result := getProductById("商品A")
fmt.Printf("%v\n", result)
}()
}
wg.Wait()
}
func getProductById(name string) string {
fmt.Println("getProductById doing...")
time.Sleep(time.Millisecond * 10) // 模拟一下耗时
return name
}$ go run main.go getProductById doing... getProductById doing... getProductById doing... getProductById doing... getProductById doing... 商品A 商品A 商品A 商品A 商品A
可以看出 getProductById 方法被访问了五次,那么如何通过 SingleFlight 进行优化呢?
定义一个全局变量 SingleFlight,在访问 getProductById 方法时调用 Do 方法,即可实现同一时间只有一次方法,代码如下:
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"time"
)
var g singleflight.Group
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, _, _ := g.Do("商品A", func() (interface{}, error) {
result := getProductById("商品A")
return result, nil
})
fmt.Printf("%v\n", resp)
}()
}
wg.Wait()
}
func getProductById(name string) string {
fmt.Println("getProductById doing...")
time.Sleep(time.Millisecond * 10) // 模拟一下耗时
return name
}$ go run main.go getProductById doing... 商品A 商品A 商品A 商品A 商品A
你可能会想 SingleFlight 和 sync.Once 的区别,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并请求中,针对于同一时间的并发场景。
SingleFlight 的实现原理
SingleFlight 的数据结构是 Group ,结构如下:
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}可以看出,SingleFlight 是使用互斥锁 Mutex 和 Map 来实现的。互斥锁 Mutex 提供并发时的读写保护,而 Map 用于保存同一个 key 正在处理的请求。
其提供了3个方法:

Do 方法的实现逻辑
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}SingleFlight 定义了一个辅助对象 call,用于代表正在执行 fn 函数的请求或者是否已经执行完请求。
- 如果存在相同的 key,其他请求将会等待这个 key 执行完成,并使用第一个 key 获取到的请求结果
- 如果不存在,创建一个 call ,并将其加入到 map 中,执行调用 fn 函数。
DoChan 方法的实现逻辑
而 DoChan 方法与 Do 方法类似:
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}Forget 方法的实现逻辑
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}将 key 从 map 中删除。
总结
使用 SingleFlight 时,通过将多个请求合并成一个,降低并发访问的压力,极大地提升了系统性能,针对于多并发读请求的场景,可以考虑是否满足 SingleFlight 的使用情况。
而对于并发写请求的场景,如果是多次写只需要一次的情况,那么也是满足的。例如:每个 http 请求都会携带 token,每次请求都需要把 token 存入缓存或者写入数据库,如果多次并发请求同时来,只需要写一次即可
以上就是Go并发原语之SingleFlight请求合并方法实例的详细内容,更多关于Go SingleFlight 请求合并的资料请关注脚本之家其它相关文章!
