Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > golang channel的正确使用

golang踩坑实战之channel的正确使用方式

作者:黄杨峻

Golang channel是Go语言中一个非常重要的特性,除了用来处理并发编程的任务中,它还可以用来进行消息传递和事件通知,这篇文章主要给大家介绍了关于golang踩坑实战之channel的正确使用方式,需要的朋友可以参考下

一、为什么要用channel

笔者也是从Java转Go的选手,之前一直很难摆脱线程池、可重入锁、AQS等数据结构及其底层的思维定式。而最近笔者也开始逐渐回顾过往的实习和实验,慢慢领悟了golang并发的一些经验了。

golang在解决并发race问题时,首要考虑的方案是使用channel。可能很多人会喜欢用互斥锁sync.Mutex,因为mutex lock只有Lock和Unlock两种操作,与Java中的ReentrantLock比较类似。但笔者实践过程中发现:

互斥锁只能做到阻塞,而无法让流程之间通信。如果不同流程之间需要交流,则需要一个类似于信号量一样的机制。同时,最好该机制能实现流程控制。譬如控制不同任务执行的先后顺序,让任务等待未完成的任务,以及打断某个轮转的状态。

如何实现这些功能?channel就是Go给出的一个优雅的答案。(当然并不是说channel可完全替代锁,锁可以使得代码和逻辑更简单)

二、基本操作

2.1 channel

channel可以看作一个FIFO的队列,队列进出都是原子操作。队列内部元素的类型可以自由选择。以下给出channel的常见操作

//初始化
ss := make(chan struct{})
sb := make(chan bool)
var s chan bool
si = make(chan int)
// 写
si <- 1
sb <- true
ss <- struct{}
//读
<-sb
i := <-si
fmt.Print(i+1)//2
// 使用完毕的channel可close
close(si)

2.2 channel缓存

一般来说,channel有带缓存和不带缓存两种。

不带缓存的channel读和写都是阻塞的,一旦某个channel发生写操作,除非另一个goroutine使用读操作将元素从channel取出,否则当前goroutine会一直阻塞。反之,如果一个不带缓存的channel被一个goroutine读取,除非另一个goroutine对该channel发起写入,否则当前goroutine会一直被阻塞。

下面这个单元测试的结果是编译器报错,提示死锁。

func TestChannel0(t *testing.T) {
	c := make(chan int)
	c <- 1
}

fatal error: all goroutines are asleep - deadlock!

如果要正确运行,应修改为

func TestChannel0(t *testing.T) {
	c := make(chan int)
	go func(c chan int) { <-c }(c)
	c <- 1
}

带通道缓存的channel的特点是,有缓存空间时可以写入数据后直接返回,缓存中有数据时可以直接读出。如果缓存空间写满,同时没有被读取,那写入会阻塞。同理,如果缓存空间没有数据,读入也会阻塞,直到有数据被写入。

//会成功执行
func TestChannel1(t *testing.T) {
	c := make(chan int,1)
	go func(c chan int) { c <- 1 }(c)
	<-c
}

//不会死锁,因为缓存空间未填满
func TestChannel2(t *testing.T) {
	c := make(chan int,1)
	c<-1
}

//会死锁,因为缓存空间填满后仍继续写入
func TestChannel3(t *testing.T) {
	c := make(chan int,1)
	c<-1
	c<-1
}

//会死锁,因为一直读取阻塞,没有写入
func TestChannel4(t *testing.T) {
	c := make(chan int,1)
	<-c
}

2.3 只读只写channel

有些channel可以被定义为只能用于写入,或者只能用于发送。

下面是具体例子

func sender(c chan<- bool){
	c <- true
	//<- c // 这一句会报错
}
func receiver(c <-chan bool){
	//c <- true// 这一句会报错
	<- c
}
func normal(){
	senderChan := make(chan<- bool)
	receiverChan := make(<-chan bool)
}

2.4 select

select允许goroutine对多个channel操作进行同时监听,当某个case子句可以运行时,该case下面的逻辑会执行,且select语句结束。如果定义了default语句,且各个case中的执行均被阻塞无法完成时,程序便会进入default的逻辑中。

值得注意的是,如果有多个case可以满足,最终执行的case语句是不确定的(不同于switch语句的从上到下依次判断是否满足)。

下面用一个例子来说明

func writeTrue(c chan bool) {
	c <- false
}
// 输出为 chan 1, 因为chan 1有可读数据
func TestSelect0(t *testing.T) {
	chan1 := make(chan bool,1)
	chan2 := make(chan bool,1)
	writeTrue(chan1)
	select {
	case <-chan1:
		fmt.Print("chan 1")
	case <-chan2:
		fmt.Print("chan 2")
	default:
		fmt.Print("default")
	}
}
// 输出为default, 因为chan1和chan2都无数据可读
func TestSelect1(t *testing.T) {
	chan1 := make(chan bool,1)
	chan2 := make(chan bool,1)
	select {
	case <-chan1:
		fmt.Print("chan 1")
	case <-chan2:
		fmt.Print("chan 2")
	default:
		fmt.Print("default")
	}
}
// 输出为 chan 1或chan 2, 因为chan 1 和chan 2均有可读数据
func TestSelect2(t *testing.T) {
	chan1 := make(chan bool,1)
	chan2 := make(chan bool,1)
	writeTrue(chan1)
	writeTrue(chan2)
	select {
	case <-chan1:
		fmt.Print("chan 1")
	case <-chan2:
		fmt.Print("chan 2")
	default:
		fmt.Print("default")
	}
}

2.5 for range

对channel的for range循环可以依次从channel中读取数据,读取数据前是不知道里面有多少元素的,如果channel中没有元素,则会阻塞等待,直到channel被关闭,退出循环。如果代码中没有关闭channel的逻辑,或者插入break语句的话,就会产生死锁。

func testLoopChan() {
	c := make(chan int)
	go func() {
		c <- 1
		c <- 2
		c <- 3
		time.Sleep(time.Second * 2)
		close(c)
	}()
	for x := range c {
		fmt.Printf("test:%+v\n", x)
	}
}

//结果
test:1
test:2
test:3
结束

这里需要注意,被for range轮询过的对象可以被视为已经从channel取出,下面我们拿两个例子来说明:

func testLoopChan2() {
	c := make(chan int)
	go func() {
		c <- 1
		c <- 2
		c <- 3
	}()
	for x := range c {
		fmt.Printf("test:%+v\n", x)
		break
	}
	<-c
	<-c
}
//输出
1

func testLoopChan3() {
	c := make(chan int)
	go func() {
		c <- 1
		c <- 2
		c <- 3
	}()
	for x := range c {
		fmt.Printf("test:%+v\n", x)
		break
	}
	<-c
	<-c
	<-c
}
//输出死锁,因为channel已经取空,最后的<-操作会导致阻塞

三、使用

3.1 状态机轮转

channel的一个核心用法就是流程控制,对于状态机轮转场景,channel可以轻松解决(经典的轮流打印ABC)。

func main(){
    chanA :=make(chan struct{},1)
    chanB :=make(chan struct{},1)
    chanC :=make(chan struct{},1)
    
    chanA<- struct{}{}
    
    go printA(chanA,chanB)
    go printB(chanB,chanC)
    go printC(chanC,chanA)
}

func printA(chanA chan struct{}, chanB chan struct{}) {
    for {
        <-chanA
        println("A")
        chanB<- struct{}{}
    }
}

func printB(chanB chan struct{}, chanC chan struct{}) {
    for {
        <-chanB
        println("B")
        chanC<- struct{}{}
    }
}

func printC(chanC chan struct{}, chanA chan struct{}) {
    for {
        <-chanC
        println("C")
        chanA<- struct{}{}
    }
}

3.2 流程退出

这是我在raft实验中get到的小技能,用一个channel表示是否需要退出。select中监听该channel,一旦被写入,即可进入退出逻辑

exit := make (chan bool)
//...
for {
	select {
		case <-exit:
			fmt.Print("exit code")
			return
		default:
			fmt.Print("normal code")
			//...
	}
}

3.3 超时控制

这也是我在raft实验中get到的技能,如果某个任务返回,可以在该任务对应的channel写入,由select读出。同时用一个case来计时,如果超过该时间仍然没有完成,则进入超时逻辑

func control(){
	taskAChan := make (chan bool)
	TaskA(taskAChan)
	select {
		case <-taskAChan:
			fmt.Print("taskA success")
		case <- <-time.After(5 * time.Second):
			ftm.Print("timeover")
	}
}

func TaskA(taskAChan chan bool){
	//TaskA的主要代码
	//...
	// 完成TaskA后才写入channel
	taskAChan <- true
}

3.4 带并发数限制的goroutine池

我实习的时候曾经碰到一个需求,需要并发地向目标服务器发起ftp请求,但是同一时间能发起的连接数量是有限的,需要由buffer channel对其进行控制。该channel有点类似于信号量,读取写入会导致缓存空间的变化。缓存在这里起的作用类似于信号量(写入读取对应PV操作),进行任务时会写入channel,完成任务时会读取channel。如果缓存空间耗尽,就会新的写入请求会阻塞,直到某一个任务完成缓存空间释放。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1 // 等待放行;
    process(r)
    // 可能需要一个很长的处理过程;
    <-sem // 完成,放行另一个过程。
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req) // 无需等待 handle 完成。
    }
}

3.5 溢出缓存

在高并发环境下,为了避免请求丢失,可以选择将来不及处理的请求缓存。这也是使用select可以实现的功能,如果一个buffer channel写满,在default逻辑中将其缓存。

func put(c message){
	select {
		case putChannel <- c:
			fmt.Print("put success")
		default:
			fmt.Print("buffer data")
			buffer(c)
	}
}

3.6 随机概率分发

select {
        case b := <-backendMsgChan:
        if sampleRate > 0 && rand.Int31n(100) > sampleRate {
            continue
        } 
}

四、坑和经验

4.1 panic

以下几种情况会导致panic

可以用ok值检查channel是否为空或者关闭

queue := make(chan int, 1)

value, ok := <-queue
if !ok {
    fmt.Println("queue is closed or nil")
	queue = nil
}

4.2 关闭的channel如果使用range会提前返回

channel 关闭会导致range返回

4.3 对reset channel进行写入

如果一个结构体的channel成员有机会被重置,它的写入必须考虑失败。

下面例子中,写入跳转到了default逻辑

type chanTest struct {
	c chan bool
}

func TestResetChannel(t *testing.T) {
	cc := chanTest{c: make(chan bool)}
	go cc.resetChan()
	select {
	case cc.c <- true:
		log.Printf("cc.c in")
	default:
		log.Printf("default")

	}
}

func (c *chanTest) resetChan() {
	c.c = make(chan bool)
}

总结

到此这篇关于golang踩坑实战之channel的正确使用方式的文章就介绍到这了,更多相关golang channel的正确使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文