Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go channel阻塞等待

Go channel发送方和接收方如何相互阻塞等待源码解读

作者:菜皮日记

这篇文章主要为大家介绍了Go channel发送方和接收方如何相互阻塞等待源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

并发编程的可见性

在 Go 官网上的内存模型一文中,介绍了在 Go 并发编程下数据可见性问题,可见性是并发编程中一个重要概念,指的是在哪些条件下,可以保证一个线程中读取某个变量时,可以观察到另一个线程对该变量的写入后的值,Go 语言中的 goroutine 也适用。

一般来说可见性属于偏硬件和底层,因为涉及到多核 CPU 的 cache 读写和同步问题,开发者不需要关心细节,高级编程语言要么屏蔽掉了这些细节,要么会给出一些保证,承诺在确定的条件下就会得到确定的结果。

Go channel 有一个特性是在一个无缓冲的 channel 上发送和接收必须等待对方准备好,才可以执行,否则会被阻塞。实际上这就是一个同步保证,那么这个同步保证是如何实现的?下面看看官方文章中是如何解释的。

先 send 后 receive

文中对 channel 的描述有几个原则,第一个是

A send on a channel is synchronized before the completion of the corresponding receive from that channel.

意思是:在一个 channel 上的发送操作应该发生在对应的接收操作完成之前。说人话就是:要先发送数据,然后才能接收数据,否则就会阻塞。这也比较符合一般的认知。

并用下面一段代码举例说明,这段代码确保一定会输出 "hello, world”。

var c = make(chan int, 10)
var a string

func f() {
    a = "hello, world"
    c <- 0
}

func main() {
    go f()
    <-c
    print(a)
}

f 函数负责给变量 a 赋值,main 函数负责打印变量 amain 函数阻塞等待在 <- c 处,直到 f 函数对 a 赋值之后并写入数据到 c 中,main 函数才被唤醒继续执行,所以此时打印 a 必然会得到结果。

先 receive 后 send?

而下面这段描述有点反直觉

A receive from an unbuffered channel is synchronized before the completion of the corresponding send on that channel.

意思是在无缓冲 channel 上的接收操作发生在对应的发送操作完成之前,说人话就是:要先接收数据,之后才可以发送数据,否则就会阻塞。这句话看上去与第一条相悖,因为第一条强调发送操作要在接收完成之前发生,而这一条强调接收操作要在发送完成之前发生,这样相互等待对方的情况,不会陷入死锁状态吗?

下面的示例代码与前一个类似,区别是将 c 换成了无缓冲 channel,并把 c 的写入和读取调换了位置,这段代码同样可以保证输出 "hello, world”。

var c = make(chan int)
var a string

func f() {
    a = "hello, world"
    <-c
}

func main() {
    go f()
    c <- 0
    print(a)
}

这两段话到底是什么意思?为什么要相互等待但又不会死锁?

接下来看看 runtime/chan.go 中是怎么实现 channel 的发送和接收的。

channel 的结构

首先看看 channel 的数据结构

type hchan struct {
    qcount   uint           // 缓冲区元素数量
    dataqsiz uint           // 缓冲区大小
    buf      unsafe.Pointer // 缓冲区起始指针
    elemsize uint16
    closed   uint32
    elemtype *_type
    sendx    uint   // 下一次发送的元素在队列中的索引
    recvx    uint   // 下一个接收的元素在队列中的索引
    recvq    waitq  // 当队列无数据时,receiver 阻塞等待的队列
    sendq    waitq  // 当队列无空间时,sender 阻塞等待的队列

    lock mutex
}

channel 内部实现了一个环形队列,通过 qcount dataqsiz buf sendx recvx 几个部分组成。

另外 channel 还维护了两个等待队列,如果在执行 <-c receive 操作时,此时 channel 不满足接收条件,receiver 会进入 recvq 等待队列;同样的如果执行 c<- send 操作时,此时 channel 不满足发送条件,sender 会进入 sendq 等待队列。

具体看代码:

var c = make(chan int)
var a string

func f() {
    a = "hello, world"

    x := <-c    // 3
    fmt.Println("\nx:", x)
}

func main() {
    go f()      // 1
    c <- 123456 // 2

    print(a)
}

send 具体干了什么

当 main 函数执行到 c<-123456 是,会执行 runtime/chan.go 中的 chansend 函数,该函数首先会判断当前 channel c 的等待接收队列是否有阻塞的 receiver

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代码...

  // 是否有等待的 receiver 存在
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // ...省略部分代码...
}

如果有等待的 receiver 则弹出队列,调用 send 函数,其中 sg 就表示 receiversg.elem 表示将数据接收到哪里去,这个地址也就对应示例代码中的变量 x 的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  // ...省略部分代码...

    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }

  // ...省略部分代码...
  // 将 goroutine 置为可执行状态
}

sendDirect 函数就是直接从 src 里面将数据复制到 dst 中。

// 直接拷贝数据
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

回到 chansend 函数,如果没有等待的 receiver,那么会查看当前 buf 中是否有空间,如果有空间,则数据缓存到 buf 中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代码...

  // 将数据缓存到 buf 中
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

  // ...省略部分代码...
}

如果也没有 buf 空间,那么就将 sender 本身放入到 sendq 等待队列中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代码...

  // 进入 sendq 等待队列
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

  // ...省略部分代码...
}

总结起来 send 操作分三部分:

receive 具体干了什么

相应的与发送类似,执行到示例代码中第 (3) 步接收数据时,会调用 runtime/chan.go 中的 chanrecv 函数来处理接收,同样是先看 sender 等待队列是否有阻塞的 sender

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代码...

  // 从等待的 sender 取一个出来
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
  // ...省略部分代码...
}

如果有的等待的 sender,那么将 sender 取出来,并复制数据。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // ...省略部分代码...
    if ep != nil {
        // copy data from sender
        recvDirect(c.elemtype, sg, ep)
    }
  // ...省略部分代码...
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

如果没有等待的 sender,那么看 buf 中有没有缓存的数据

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代码...
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
  // ...省略部分代码...
}

最后如果也没有 buf 数据,那么久把自己加入到 receiver 等待队列中 recvq

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代码...

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)

  // ...省略部分代码...
}

总结起来 receive 操作分三部分:

小结

这样一来就能够理解前面的两个原则了,在一个无缓冲的 channel 中,无论是 sender 先执行,还是 receiver 先执行,都会因为找不到对方,并且没有 buf 空间的情况下,将自己加入到等待队列;当对方开始执行时就会检查到已经有对端正在阻塞,进而拷贝数据,并唤醒阻塞的对象最终走完整个流程。

有一种说法是:sender 必须在 receiver 准备好才能执行,否则就会阻塞;而 receiver 必须在 sender 准备好才能执行,否则就会阻塞;这个说法没错,但是太笼统了,什么叫准备好?怎么算是准备好?这是比较模糊的。而看过 send 和 receive 的流程之后,就更能理解整个过程了。

为什么要有无缓冲 channel

实际上两个 goroutine 相互等待对方到达某个状态的效果,非常类似操作系统中的一种同步机制:屏障 barrier,同步屏障要求只有当所有进程都到达屏障后,才能一起执行下一状态,否则就阻塞在屏障处。

回到 channel 操作,即 sender 和 receiver 无论谁先执行,都必须等待对方也已经执行,两者才可以继续执行。就像一块电路板串联有两个开关,要想电路联通,必须两个开关都被打开才可以,而不管哪一个先打开,都必须等待另一个开关也打开,之后电流才可以接通电路也才联通。

可以将无缓冲 channel 看做是一种同步屏障,同步屏障能够让多个 goroutine 都达到某种状态之后才可以继续执行,这是带缓冲 channel 无法做到的。另外在无缓冲 channel 数据的交换更加简单快速,因为不需要维护缓存 buf,实现逻辑也更简单,运行更可靠。

以上就是Go channel发送方和接收方如何相互阻塞等待源码解读的详细内容,更多关于Go channel相互阻塞等待的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文