Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go I/O并发处理

Go语言高效I/O并发处理双缓冲和Exchanger模式实例探索

作者:晁岳攀(鸟窝) 鸟窝聊技术

这篇文章主要介绍了Go语言高效I/O并发处理双缓冲和Exchanger模式实例探索,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

双缓冲(double buffering)

双缓冲(double buffering)是高效处理 I/O 操作的一种并发技术,它使用两个 buffer,一个 goroutine 使用其中一个 buffer 进行写,而另一个 goroutine 使用另一个 buffer 进行读,然后进行交换。这样两个 goroutine 可能并发的执行,减少它们之间的等待和阻塞。

本文还提供了一个类似 Java 的java.util.concurrent.Exchanger[1]的 Go 并发原语,它可以用来在两个 goroutine 之间交换数据,快速实现双缓冲的模式。 这个并发原语可以在github.com/smallnest/exp/sync/Exchanger[2]找到。

double buffering 并发模式

双缓冲(double buffering)设计方式虽然在一些领域中被广泛的应用,但是我还没有看到它在并发模式中专门列出了,或者专门列为一种模式。这里我们不妨把它称之为双缓存模式

这是一种在 I/O 处理领域广泛使用的用来提速的编程技术,它使用两个缓冲区来加速计算机,该计算机可以重叠 I/O 和处理。一个缓冲区中的数据正在处理,而下一组数据被读入另一个缓冲区。 在流媒体应用程序中,一个缓冲区中的数据被发送到声音或图形卡,而另一个缓冲区则被来自源(Internet、本地服务器等)的更多数据填充。 当视频显示在屏幕上时,一个缓冲区中的数据被填充,而另一个缓冲区中的数据正在显示。当在缓冲区之间移动数据的功能是在硬件电路中实现的,而不是由软件执行时,全动态视频的速度会加快,不但速度被加快,而且可以减少黑屏闪烁的可能。

在这个模式中,两个 goroutine 并发的执行,一个 goroutine 使用一个 buffer 进行写(不妨称为 buffer1),而另一个 goroutine 使用另一个 buffer 进行读(不妨称为 buffer2)。如图所示。 当左边的 writer 写满它当前使用的 buffer1 后,它申请和右边的 goroutine 的 buffer2 进行交换,这会出现两种情况:

同样右边的 goroutine 也是同样的处理,当它读完 buffer2 后,它会申请和左边的 goroutine 的 buffer1 进行交换,这会出现两种情况:

这样两个 goroutine 就可以并发的执行,而不用等待对方的读写操作。这样可以提高并发处理的效率。

不仅仅如此, double buffering 其实可以应用于更多的场景, 不仅仅是 buffer 的场景,如 Java 的垃圾回收机制中,HotSpot JVM 把年轻代分为了三部分:1 个 Eden 区和 2 个 Survivor 区(分别叫 from 和 to,或者 s0 和 s1),在 GC 开始的时候,对象只会存在于 Eden 区和名为“From”的 Survivor 区,Survivor 区“To”是空的。紧接着进行 GC,Eden 区中所有存活的对象都会被复制到“To”,而在“From”区中,仍存活的对象会根据他们的年龄值来决定去向。年龄达到一定值的对象会被移动到年老代中,没有达到阈值的对象会被复制到“To”区域。经过这次 GC 后,Eden 区和 From 区已经被清空。这个时候,“From”和“To”会交换(exchange)他们的角色,也就是新的“To”就是上次 GC 前的“From”,新的“From”就是上次 GC 前的“To”。不管怎样,都会保证名为 To 的 Survivor 区域是空的。Minor GC 会一直重复这样的过程,直到“To”区被填满,“To”区被填满之后,会将所有对象移动到年老代中。

Exchanger 的实现

既然有这样的场景,有这样的需求,所以我们需要针对这样场景的一个同步原语。Java 给我们做了一个很好的师范,接下来我们使用实现相应的 Go,但是我们的实现和 Java 的实现完全不同,我们要基于 Go 既有的同步原语来实现。

基于 Java 实现的 Exchanger 的功能,我们也实现一个Exchanger, 我们期望它的功能如下:

如果你非常熟悉 Go 的各种同步原语,你可以很快的组合出这样一个同步原语。如果你还不是那么熟悉,建议你阅读《深入理解 Go 并发编程》这本书,京东有售。 下面是一个简单的实现,代码在Exchanger[3]。 我们只用leftright指代这两个 goroutine, goroutine 是 Go 语言中的并发单元,我们期望的就是这两个 goroutine 发生关系。

为了跟踪这两个 goroutine,我们需要使用 goroutine id 来标记这两个 goroutine,这样避免了第三者插入。

type Exchanger[T any] struct {
 leftGoID, rightGoID int64
 left, right         chan T
}

你必须使用 NewExchanger 创建一个Exchanger,它会返回一个Exchanger的指针。 初始化的时候我们把 left 和 right 的 id 都设置为-1,表示还没有 goroutine 使用它们,并且不会和所有的 goroutine 的 id 冲突。 同时我们创建两个 channel,一个用来左边的 goroutine 写,右边的 goroutine 读,另一个用来右边的 goroutine 写,左边的 goroutine 读。channel 的 buffer 设置为 1,这样可以避免死锁。

func NewExchanger[T any]( "T any") *Exchanger[T] {
 return &Exchanger[T]{
  leftGoID:  -1,
  rightGoID: -1,
  left:      make(chan T, 1),
  right:     make(chan T, 1),
 }
}

Exchange方法是核心方法,它用来交换数据,它的实现如下:

func (e *Exchanger[T]) Exchange(value T) T {
 goid := goroutine.ID()
 // left goroutine
 isLeft := atomic.CompareAndSwapInt64(&e.leftGoID, -1, goid)
 if !isLeft {
  isLeft = atomic.LoadInt64(&e.leftGoID) == goid
 }
 if isLeft {
  e.right <- value // send value to right
  return <-e.left  // wait for value from right
 }
 // right goroutine
 isRight := atomic.CompareAndSwapInt64(&e.rightGoID, -1, goid)
 if !isRight {
  isRight = atomic.LoadInt64(&e.rightGoID) == goid
 }
 if isRight {
  e.left <- value  // send value to left
  return <-e.right // wait for value from left
 }
 // other goroutine
 panic("sync: exchange called from neither left nor right goroutine")
}

当一个 goroutine 调用的时候,首先我们尝试把它设置为left,如果成功,那么它就是left。 如果不成功,我们就判断它是不是先前已经是left,如果是,那么它就是left。 如果先前,或者此时left已经被另一个 goroutine 占用了,它还有机会成为right,同样的逻辑检查和设置right

如果既不是left也不是right,那么就是第三者插入了,我们需要 panic,因为我们不希望第三者插足。

如果它是left,那么它就会把数据发送到right,然后等待right发送数据过来。 如果它是right,那么它就会把数据发送到left,然后等待left发送数据过来。

这样就实现了数据的交换。

Exchanger 的使用

我们使用一个简单的双缓冲例子来说明如何使用Exchanger,我们创建两个 goroutine,一个 goroutine 负责写,另一个 goroutine 负责读,它们之间通过Exchanger来交换数据。

 buf1 := bytes.NewBuffer(make([]byte, 1024))
 buf2 := bytes.NewBuffer(make([]byte, 1024))
 exchanger := syncx.NewExchanger[*bytes.Buffer]( "*bytes.Buffer")
 var wg sync.WaitGroup
 wg.Add(2)
 expect := 0
 go func() { // g1
  defer wg.Done()
  buf := buf1
  for i := 0; i < 10; i++ {
   for j := 0; j < 1024; j++ {
    buf.WriteByte(byte(j / 256))
    expect += j / 256
   }
   buf = exchanger.Exchange(buf)
  }
 }()
 var got int
 go func() { // g2
  defer wg.Done()
  buf := buf2
  for i := 0; i < 10; i++ {
   buf = exchanger.Exchange(buf)
   for _, b := range buf.Bytes() {
    got += int(b)
   }
   buf.Reset()
  }
 }()
 wg.Wait()
 fmt.Println(got)
 fmt.Println(expect == got)

在这个例子中 g1负责写,每个 buffer 的容量是 1024,写满就交给另外一个读 g2,并从读 g2 中交换过来一个空的 buffer 继续写。 交换 10 次之后,两个 goroutine 都退出了,我们检查写入的数据和读取的数据是否一致,如果一致,那么就说明我们的Exchanger实现是正确的。

总结

文本介绍了一种类似 Java 的Exchanger的同步原语的实现,这个同步原语可以在双缓冲的场景中使用,提高并发处理的性能。

参考资料

[1]java.util.concurrent.Exchanger: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Exchanger.html 

[2]github.com/smallnest/exp/sync/Exchanger: https://pkg.go.dev/github.com/smallnest/exp@v0.2.2/sync#Exchanger 

[3]Exchanger: https://pkg.go.dev/github.com/smallnest/exp@v0.2.2/sync#Exchanger 

以上就是Go语言高效I/O并发处理双缓冲和Exchanger模式实例探索的详细内容,更多关于Go I/O并发处理的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文