Go channel发送方和接收方如何相互阻塞等待源码解读
作者:菜皮日记
并发编程的可见性
在 Go 官网上的内存模型一文中,介绍了在 Go 并发编程下数据可见性问题,可见性是并发编程中一个重要概念,指的是在哪些条件下,可以保证一个线程中读取某个变量时,可以观察到另一个线程对该变量的写入后的值,Go 语言中的 goroutine 也适用。
一般来说可见性属于偏硬件和底层,因为涉及到多核 CPU 的 cache 读写和同步问题,开发者不需要关心细节,高级编程语言要么屏蔽掉了这些细节,要么会给出一些保证,承诺在确定的条件下就会得到确定的结果。
Go channel 有一个特性是在一个无缓冲的 channel 上发送和接收必须等待对方准备好,才可以执行,否则会被阻塞。实际上这就是一个同步保证,那么这个同步保证是如何实现的?下面看看官方文章中是如何解释的。
先 send 后 receive
文中对 channel 的描述有几个原则,第一个是
A send on a channel is synchronized before the completion of the corresponding receive from that channel.
意思是:在一个 channel 上的发送操作应该发生在对应的接收操作完成之前。说人话就是:要先发送数据,然后才能接收数据,否则就会阻塞。这也比较符合一般的认知。
并用下面一段代码举例说明,这段代码确保一定会输出 "hello, world”。
var c = make(chan int, 10) var a string func f() { a = "hello, world" c <- 0 } func main() { go f() <-c print(a) }
f
函数负责给变量 a
赋值,main
函数负责打印变量 a
。main
函数阻塞等待在 <- c
处,直到 f
函数对 a
赋值之后并写入数据到 c
中,main
函数才被唤醒继续执行,所以此时打印 a
必然会得到结果。
先 receive 后 send?
而下面这段描述有点反直觉
A receive from an unbuffered channel is synchronized before the completion of the corresponding send on that channel.
意思是在无缓冲 channel 上的接收操作发生在对应的发送操作完成之前,说人话就是:要先接收数据,之后才可以发送数据,否则就会阻塞。这句话看上去与第一条相悖,因为第一条强调发送操作要在接收完成之前发生,而这一条强调接收操作要在发送完成之前发生,这样相互等待对方的情况,不会陷入死锁状态吗?
下面的示例代码与前一个类似,区别是将 c 换成了无缓冲 channel,并把 c 的写入和读取调换了位置,这段代码同样可以保证输出 "hello, world”。
var c = make(chan int) var a string func f() { a = "hello, world" <-c } func main() { go f() c <- 0 print(a) }
这两段话到底是什么意思?为什么要相互等待但又不会死锁?
接下来看看 runtime/chan.go 中是怎么实现 channel 的发送和接收的。
channel 的结构
首先看看 channel 的数据结构
type hchan struct { qcount uint // 缓冲区元素数量 dataqsiz uint // 缓冲区大小 buf unsafe.Pointer // 缓冲区起始指针 elemsize uint16 closed uint32 elemtype *_type sendx uint // 下一次发送的元素在队列中的索引 recvx uint // 下一个接收的元素在队列中的索引 recvq waitq // 当队列无数据时,receiver 阻塞等待的队列 sendq waitq // 当队列无空间时,sender 阻塞等待的队列 lock mutex }
channel 内部实现了一个环形队列,通过 qcount
dataqsiz
buf
sendx
recvx
几个部分组成。
另外 channel 还维护了两个等待队列,如果在执行 <-c
receive 操作时,此时 channel 不满足接收条件,receiver 会进入 recvq 等待队列;同样的如果执行 c<-
send 操作时,此时 channel 不满足发送条件,sender 会进入 sendq 等待队列。
具体看代码:
var c = make(chan int) var a string func f() { a = "hello, world" x := <-c // 3 fmt.Println("\nx:", x) } func main() { go f() // 1 c <- 123456 // 2 print(a) }
send 具体干了什么
当 main
函数执行到 c<-123456
是,会执行 runtime/chan.go 中的 chansend
函数,该函数首先会判断当前 channel c 的等待接收队列是否有阻塞的 receiver
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ...省略部分代码... // 是否有等待的 receiver 存在 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // ...省略部分代码... }
如果有等待的 receiver 则弹出队列,调用 send
函数,其中 sg
就表示 receiver
,sg.elem
表示将数据接收到哪里去,这个地址也就对应示例代码中的变量 x 的地址。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ...省略部分代码... if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } // ...省略部分代码... // 将 goroutine 置为可执行状态 }
sendDirect
函数就是直接从 src
里面将数据复制到 dst
中。
// 直接拷贝数据 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
回到 chansend
函数,如果没有等待的 receiver
,那么会查看当前 buf 中是否有空间,如果有空间,则数据缓存到 buf 中。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ...省略部分代码... // 将数据缓存到 buf 中 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } // ...省略部分代码... }
如果也没有 buf 空间,那么就将 sender 本身放入到 sendq 等待队列中。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ...省略部分代码... // 进入 sendq 等待队列 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // ...省略部分代码... }
总结起来 send 操作分三部分:
- 如果当前 channel 上有等待的 receiver,则直接 copy 数据过去
- 否则如果当前 buf 有空闲空间,则将数据存在 buf 中
- 否则将 sender 本身加入到 sendq 等待队列中
receive 具体干了什么
相应的与发送类似,执行到示例代码中第 (3) 步接收数据时,会调用 runtime/chan.go 中的 chanrecv
函数来处理接收,同样是先看 sender 等待队列是否有阻塞的 sender
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ...省略部分代码... // 从等待的 sender 取一个出来 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // ...省略部分代码... }
如果有的等待的 sender,那么将 sender 取出来,并复制数据。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ...省略部分代码... if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } // ...省略部分代码... } func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
如果没有等待的 sender,那么看 buf 中有没有缓存的数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ...省略部分代码... if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } // ...省略部分代码... }
最后如果也没有 buf 数据,那么久把自己加入到 receiver 等待队列中 recvq
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ...省略部分代码... gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // ...省略部分代码... }
总结起来 receive 操作分三部分:
- 如果当前 channel 上有等待的 sender,则直接 copy 数据过去
- 否则如果当前 buf 有缓存的数据,则将读取该数据
- 否则将 receiver 本身加入到 recvq 等待队列中
小结
这样一来就能够理解前面的两个原则了,在一个无缓冲的 channel 中,无论是 sender 先执行,还是 receiver 先执行,都会因为找不到对方,并且没有 buf 空间的情况下,将自己加入到等待队列;当对方开始执行时就会检查到已经有对端正在阻塞,进而拷贝数据,并唤醒阻塞的对象最终走完整个流程。
有一种说法是:sender 必须在 receiver 准备好才能执行,否则就会阻塞;而 receiver 必须在 sender 准备好才能执行,否则就会阻塞;这个说法没错,但是太笼统了,什么叫准备好?怎么算是准备好?这是比较模糊的。而看过 send 和 receive 的流程之后,就更能理解整个过程了。
为什么要有无缓冲 channel
实际上两个 goroutine 相互等待对方到达某个状态的效果,非常类似操作系统中的一种同步机制:屏障 barrier,同步屏障要求只有当所有进程都到达屏障后,才能一起执行下一状态,否则就阻塞在屏障处。
回到 channel 操作,即 sender 和 receiver 无论谁先执行,都必须等待对方也已经执行,两者才可以继续执行。就像一块电路板串联有两个开关,要想电路联通,必须两个开关都被打开才可以,而不管哪一个先打开,都必须等待另一个开关也打开,之后电流才可以接通电路也才联通。
可以将无缓冲 channel 看做是一种同步屏障,同步屏障能够让多个 goroutine 都达到某种状态之后才可以继续执行,这是带缓冲 channel 无法做到的。另外在无缓冲 channel 数据的交换更加简单快速,因为不需要维护缓存 buf,实现逻辑也更简单,运行更可靠。
以上就是Go channel发送方和接收方如何相互阻塞等待源码解读的详细内容,更多关于Go channel相互阻塞等待的资料请关注脚本之家其它相关文章!