Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go SingleFlight请求合并

Go并发原语之SingleFlight请求合并方法实例

作者:fliyu

本文我们来学习一下 Go 语言的扩展并发原语:SingleFlight,SingleFlight 的作用是将并发请求合并成一个请求,以减少重复的进程来优化 Go 代码

SingleFlight 的使用场景

在处理多个 goroutine 同时调用同一个函数的时候,如何只用一个 goroutine 去调用一次函数,并将返回结果给到所有 goroutine,这是可以使用 SingleFlight,可以减少并发调用的数量。

在高并发请求场景中,例如秒杀场景:多个用户在同一时间查询库存数,这时候对于所有的用户而言,同一时间查询结果都是一样的,如果后台都去查缓存或者数据库,那么性能压力很大。如果相同时间只有一个查询,那么性能将显著提升。

一句话总结:SingleFlight 主要作用是合并并发请求的场景,针对于相同的读请求。

SingleFlight 的基本使用

下面先看看这段代码,5个协程同时并发返回 getProductById ,看看输出结果如何:

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 5; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      result := getProductById("商品A")
      fmt.Printf("%v\n", result)
    }()
  }
  wg.Wait()
}
func getProductById(name string) string {
  fmt.Println("getProductById doing...")
  time.Sleep(time.Millisecond * 10) // 模拟一下耗时
  return name
}
$ go run main.go
getProductById doing...
getProductById doing...
getProductById doing...
getProductById doing...
getProductById doing...
商品A
商品A
商品A
商品A
商品A

可以看出 getProductById 方法被访问了五次,那么如何通过 SingleFlight 进行优化呢?

定义一个全局变量 SingleFlight,在访问 getProductById 方法时调用 Do 方法,即可实现同一时间只有一次方法,代码如下:

import (
  "fmt"
  "golang.org/x/sync/singleflight"
  "sync"
  "time"
)
var g singleflight.Group
func main() {
  var wg sync.WaitGroup
  for i := 0; i < 5; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      resp, _, _ := g.Do("商品A", func() (interface{}, error) {
        result := getProductById("商品A")
        return result, nil
      })
      fmt.Printf("%v\n", resp)
    }()
  }
  wg.Wait()
}
func getProductById(name string) string {
  fmt.Println("getProductById doing...")
  time.Sleep(time.Millisecond * 10) // 模拟一下耗时
  return name
}
$ go run main.go
getProductById doing...
商品A
商品A
商品A
商品A
商品A

你可能会想 SingleFlight 和 sync.Once 的区别,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并请求中,针对于同一时间的并发场景。

SingleFlight 的实现原理

SingleFlight 的数据结构是 Group ,结构如下:

// call is an in-flight or completed singleflight.Do call
type call struct {
  wg sync.WaitGroup
  // These fields are written once before the WaitGroup is done
  // and are only read after the WaitGroup is done.
  val interface{}
  err error
  // These fields are read and written with the singleflight
  // mutex held before the WaitGroup is done, and are read but
  // not written after the WaitGroup is done.
  dups  int
  chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
  mu sync.Mutex       // protects m
  m  map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
  Val    interface{}
  Err    error
  Shared bool
}

可以看出,SingleFlight 是使用互斥锁 Mutex 和 Map 来实现的。互斥锁 Mutex 提供并发时的读写保护,而 Map 用于保存同一个 key 正在处理的请求。

其提供了3个方法:

Do 方法的实现逻辑

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  g.mu.Lock()
  if g.m == nil {
    g.m = make(map[string]*call)
  }
  if c, ok := g.m[key]; ok {
    c.dups++
    g.mu.Unlock()
    c.wg.Wait()
    if e, ok := c.err.(*panicError); ok {
      panic(e)
    } else if c.err == errGoexit {
      runtime.Goexit()
    }
    return c.val, c.err, true
  }
  c := new(call)
  c.wg.Add(1)
  g.m[key] = c
  g.mu.Unlock()
  g.doCall(c, key, fn)
  return c.val, c.err, c.dups > 0
}

SingleFlight 定义了一个辅助对象 call,用于代表正在执行 fn 函数的请求或者是否已经执行完请求。

DoChan 方法的实现逻辑

而 DoChan 方法与 Do 方法类似:

// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
  ch := make(chan Result, 1)
  g.mu.Lock()
  if g.m == nil {
    g.m = make(map[string]*call)
  }
  if c, ok := g.m[key]; ok {
    c.dups++
    c.chans = append(c.chans, ch)
    g.mu.Unlock()
    return ch
  }
  c := &call{chans: []chan<- Result{ch}}
  c.wg.Add(1)
  g.m[key] = c
  g.mu.Unlock()
  go g.doCall(c, key, fn)
  return ch
}

Forget 方法的实现逻辑

// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
  g.mu.Lock()
  delete(g.m, key)
  g.mu.Unlock()
}

将 key 从 map 中删除。

总结

使用 SingleFlight 时,通过将多个请求合并成一个,降低并发访问的压力,极大地提升了系统性能,针对于多并发读请求的场景,可以考虑是否满足 SingleFlight 的使用情况。

而对于并发写请求的场景,如果是多次写只需要一次的情况,那么也是满足的。例如:每个 http 请求都会携带 token,每次请求都需要把 token 存入缓存或者写入数据库,如果多次并发请求同时来,只需要写一次即可

以上就是Go并发原语之SingleFlight请求合并方法实例的详细内容,更多关于Go SingleFlight 请求合并的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文