Go并发编程sync.Cond的具体使用
作者:麦超
简介
Go
标准库提供 Cond
原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond
通常应用于等待某个条件的一组 goroutine
,等条件变为 true
的时候,其中一个 goroutine
或者所有的 goroutine
都会被唤醒执行。
Cond
是和某个条件相关,这个条件需要一组 goroutine
协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine
都会被阻塞住,只有这一组 goroutine
通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。
这个条件可以是我们自定义的 true/false
逻辑表达式。
但是 Cond
使用的比较少,因为在大部分场景下是可以被 Channel
和 WaitGroup
来替换的。
详细介绍
下面就是 Cond
的数据结构和对外提供的方法,Cond
内部维护了一个等待队列和锁实例。
type Cond struct { noCopy noCopy // 锁 L Locker // 等待队列 notify notifyList checker copyChecker } func NeWCond(l Locker) *Cond func (c *Cond) Broadcast() func (c *Cond) Signal() func (c *Cond) Wait()
NeWCond:
NeWCond
方法需要调用者传入一个Locker
接口,这个接口就Lock/UnLock
方法,所以我们可以传入一个sync.Metex
对象Signal:允许调用者唤醒一个等待当前
Cond
的goroutine
。如果Cond
等待队列中有一个或者多个等待的goroutine
,则从等待队列中移除第一个goroutine
并把它唤醒Broadcast:允许调用者唤醒所有等待当前
Cond
的goroutine
。如果 Cond 等待队列中有一个或者多个等待的goroutine
,则清空所有等待的goroutine
,并全部唤醒Wait:会把调用者放入
Cond
的等待队列中并阻塞,直到被Signal
或者Broadcast
的方法从等待队列中移除并唤醒
案例:Redis连接池
可以看一下下面的代码,使用了 Cond
实现一个 Redis
的连接池,最关键的代码就是在链表为空的时候需要调用 Cond
的 Wait
方法,将 gorutine
进行阻塞。然后 goruntine
在使用完连接后,将连接返回池子后,需要通知其他阻塞的 goruntine
来获取连接。
package main import ( "container/list" "fmt" "math/rand" "sync" "time" ) // 连接池 type Pool struct { lock sync.Mutex // 锁 clients list.List // 连接 cond *sync.Cond // cond实例 close bool // 是否关闭 } // Redis Client type Client struct { id int32 } // 创建Redis Client func NewClient() *Client { return &Client{ id: rand.Int31n(100000), } } // 关闭Redis Client func (this *Client) Close() { fmt.Printf("Client:%d 正在关闭", this.id) } // 创建连接池 func NewPool(maxConnNum int) *Pool { pool := new(Pool) pool.cond = sync.NewCond(&pool.lock) // 创建连接 for i := 0; i < maxConnNum; i++ { client := NewClient() pool.clients.PushBack(client) } return pool } // 从池子中获取连接 func (this *Pool) Pull() *Client { this.lock.Lock() defer this.lock.Unlock() // 已关闭 if this.close { fmt.Println("Pool is closed") return nil } // 如果连接池没有连接 需要阻塞 for this.clients.Len() <= 0 { this.cond.Wait() } // 从链表中取出头节点,删除并返回 ele := this.clients.Remove(this.clients.Front()) return ele.(*Client) } // 将连接放回池子 func (this *Pool) Push(client *Client) { this.lock.Lock() defer this.lock.Unlock() if this.close { fmt.Println("Pool is closed") return } // 向链表尾部插入一个连接 this.clients.PushBack(client) // 唤醒一个正在等待的goruntine this.cond.Signal() } // 关闭池子 func (this *Pool) Close() { this.lock.Lock() defer this.lock.Unlock() // 关闭连接 for e := this.clients.Front(); e != nil; e = e.Next() { client := e.Value.(*Client) client.Close() } // 重置数据 this.close = true this.clients.Init() } func main() { var wg sync.WaitGroup pool := NewPool(3) for i := 1; i <= 10; i++ { wg.Add(1) go func(index int) { defer wg.Done() // 获取一个连接 client := pool.Pull() fmt.Printf("Time:%s | 【goruntine#%d】获取到client[%d]\n", time.Now().Format("15:04:05"), index, client.id) time.Sleep(time.Second * 5) fmt.Printf("Time:%s | 【goruntine#%d】使用完毕,将client[%d]放回池子\n", time.Now().Format("15:04:05"), index, client.id) // 将连接放回池子 pool.Push(client) }(i) } wg.Wait() }
运行结果:
Time:15:10:25 | 【goruntine#7】获取到client[31847]
Time:15:10:25 | 【goruntine#5】获取到client[27887]
Time:15:10:25 | 【goruntine#10】获取到client[98081]
Time:15:10:30 | 【goruntine#5】使用完毕,将client[27887]放回池子
Time:15:10:30 | 【goruntine#6】获取到client[27887]
Time:15:10:30 | 【goruntine#10】使用完毕,将client[98081]放回池子
Time:15:10:30 | 【goruntine#7】使用完毕,将client[31847]放回池子
Time:15:10:30 | 【goruntine#1】获取到client[31847]
Time:15:10:30 | 【goruntine#9】获取到client[98081]
Time:15:10:35 | 【goruntine#6】使用完毕,将client[27887]放回池子
Time:15:10:35 | 【goruntine#3】获取到client[27887]
Time:15:10:35 | 【goruntine#1】使用完毕,将client[31847]放回池子
Time:15:10:35 | 【goruntine#4】获取到client[31847]
Time:15:10:35 | 【goruntine#9】使用完毕,将client[98081]放回池子
Time:15:10:35 | 【goruntine#2】获取到client[98081]
Time:15:10:40 | 【goruntine#3】使用完毕,将client[27887]放回池子
Time:15:10:40 | 【goruntine#8】获取到client[27887]
Time:15:10:40 | 【goruntine#2】使用完毕,将client[98081]放回池子
Time:15:10:40 | 【goruntine#4】使用完毕,将client[31847]放回池子
Time:15:10:45 | 【goruntine#8】使用完毕,将client[27887]放回池子
注意点
- 在调用
Wait
方法前,需要先加锁,就像我上面例子中Pull
方法也是先加锁
看一下源码就知道了,因为 Wait
方法的执行逻辑是先将 goruntine
添加到等待队列中,然后释放锁,然后阻塞,等唤醒后,会继续加锁。如果在调用 Wait
前不加锁,但是里面会解锁,执行的时候就会报错。
// // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // func (c *Cond) Wait() { c.checker.check() // 添加到等待队列 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 阻塞 runtime_notifyListWait(&c.notify, t) c.L.Lock() }
- 还是
Wait
方法,在唤醒后需要继续检查Cond
条件
就拿上面的 redis
连接案例来进行说明吧,我这里是使用了 for
循环来进行检测。如果将 for
循环改成使用 if
,也就是只判断一次,会有什么问题?可以停下来先想想
上面说了调用者也可以使用 Broadcast
方法来唤醒 goruntine
,如果使用的是 Broadcast
方法,所有的 goruntine
都会被唤醒,然后大家都去链表中去获取 redis
连接了,就会出现部分 goruntine
拿不到连接,实际上没有那么多连接可以获取,因为每次只会放回一个连接到池子中。
// 如果连接池没有连接 需要阻塞 for this.clients.Len() <= 0 { this.cond.Wait() } // 获取连接 ele := this.clients.Remove(this.clients.Front()) return ele.(*Client)
到此这篇关于Go并发编程sync.Cond的具体使用的文章就介绍到这了,更多相关Go sync.Cond内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!