Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go信号量设计

Golang信号量设计实现示例详解

作者:ag9920

这篇文章主要为大家介绍了Golang信号量设计实现示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

开篇

在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema:

type Mutex struct {
    state int32 
    sema  uint32
}

这个 sema 就是 semaphore 信号量的意思。Golang 协程之间的抢锁,实际上争抢给Locked赋值的权利,能给 Locked 置为1,就说明抢锁成功。抢不到就阻塞等待 sema 信号量,一旦持有锁的协程解锁,那么等待的协程会依次被唤醒。

有意思的是,虽然 semaphore 在锁的实现中起到了至关重要的作用,Golang 对信号量的实现却是隐藏在 runtime 中,并没有包含到标准库里来,在 src 源码中我们可以看到底层依赖的信号量相关函数。

// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

两个原子操作,一个 acquire,一个 release,其实就代表了对资源的获取和释放。Mutex 作为 sync 包的核心,支撑了 RWMutex,channel,singleflight 等多个并发控制的能力,而对信号量的管理又是 Mutex 的基础。

虽然源码看不到,但 Golang 其实在扩展库 golang.org/x/sync/semaphore 也提供了一套信号量的实现,我们可以由此来参考一下,理解 semaphore 的实现思路。

信号量

在看源码之前,我们先理清楚【信号量】设计背后的场景和原理。

信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

在 Mutex 依赖的信号量机制中我们可以看到,这里本质就是依赖 sema 一个 uint32 的变量 + 原子操作来实现并发控制能力。当 goroutine 完成对信号量等待时,该变量 -1,当 goroutine 完成对信号量的释放时,该变量 +1。

如果一个新的 goroutine 发现信号量不大于 0,说明资源暂时没有,就得阻塞等待。直到信号量 > 0,此时的语义是有新的资源,该goroutine就会结束等待,完成对信号量的 -1 并返回。注意我们上面有提到,runtime 支持的两个方法都是原子性的,不用担心两个同时在等待的 goroutine 同时抢占同一份资源。

典型的信号量场景是【图书馆借书】。假设学校图书馆某热门书籍现在只有 100 本存货,但是上万学生都想借阅,怎么办?

直接买一万本书是非常简单粗暴的解法,但资源有限,这不是长久之计。

常见的解决方案很简单:学生们先登记,一个一个来。我们先给 100 个同学发出,剩下的你们继续等,等到什么时候借书的同学看完了,把书还回来了,就给排队等待的同学们发放。同时,为了避免超发,每发一个,都需要在维护的记录里将【余量】减去 1,每还回来一个,就把【余量】加上 1。

runtime_Semacquire 就是排队等待借书,runtime_Semrelease 就是看完了把书归还给图书馆。

另外需要注意,虽然我们上面举例的增加/减小的粒度都是 1,但这本质上只是一种场景,事实上就算是图书馆借书,也完全有可能出现一个人同时借了两本一模一样的书。所以,信号量的设计需要支持 N 个资源的获取和释放。

所以,我们对于 acquire 和 release 两种操作的语义如下:

semaphore 扩展库实现

这里我们结合golang.org/x/sync/semaphore 源码来看看怎样设计出来我们上面提到的信号量结构。

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
	size    int64       // 最大资源数
	cur     int64       // 当前已被使用的资源
	mu      sync.Mutex
	waiters list.List   // 等待队列
}

有意思的是,虽然包名是 semaphore,但是扩展库里真正给【信号量结构体】定义的名称是 Weighted。从上面的定义我们可以看到,传入初始资源个数 n(对应 size),就可以生成一个 Weighted 信号量结构。

Weighted 提供了三个方法来实现对信号量机制的支持:

对应上面我们提到的 acquire 语义,注意我们提到过,抽象的来讲,acquire 成功与否其实不太看返回值,而是只要获取不了就一直阻塞,如果返回了,就意味着获取到了。

但在 Golang 实现当中,我们肯定不希望,如果发生了异常 case,导致一直阻塞在这里。所以你可以看到 Acquire 的入参里有个 context.Context,借用 context 的上下文控制能力,你可以对此进行 cancel, 可以设置 timeout 等待超时,就能对 acquire 行为进行更多约束。

所以,acquire 之后我们仍然需要检查返回值 error,如果为 nil,代表正常获取了资源。否则可能是 context 已经不合法了。

跟上面提到的 release 语义完全一致,传入你要释放的资源数 n,保证原子性地增加信号量。

这里其实跟 sync 包中的各类 TryXXX 函数定位很像。并发的机制中大都包含 fast path 和 slow path,比如首个 goroutine 先来 acquire,那么一定是能拿到的,后续再来请求的 goroutine 由于慢了一步,只能走 slow path 进行等待,自旋等操作。sync 包中绝大部分精华,都在于 slow path 的处理。fast path 大多是一个基于 atomic 包的原子操作,比如 CAS 就可以解决。

TryAcquire 跟 Acquire 的区别在于,虽然也是要资源,但是不等待。有了我就获取,就减信号量,返回 trye。但是如果目前还没有,我不会阻塞在这里,而是直接返回 false。

下面我们逐个方法看看,Weighted 是怎样实现的。

Acquire

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}
	if n > s.size {
		// Don't make other Acquire calls block on one that's doomed to fail.
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w)
	s.mu.Unlock()
	select {
	case <-ctx.Done():
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready:
			// Acquired the semaphore after we were canceled.  Rather than trying to
			// fix up the queue, just pretend we didn't notice the cancelation.
			err = nil
		default:
			isFront := s.waiters.Front() == elem
			s.waiters.Remove(elem)
			// If we're at the front and there're extra tokens left, notify other waiters.
			if isFront && s.size > s.cur {
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err
	case <-ready:
		return nil
	}
}

在阅读之前回忆一下上面 Weighted 结构的定义,注意 Weighted 并没有维护一个变量用来表示【当前剩余的资源】,这一点是通过 size(初始化的时候设置,表示总资源数)减去 cur(当前已被使用的资源),二者作差得到的。

我们来拆解一下上面这段代码:

第一步:这是常规意义上的 fast path

s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
}

第二步:针对特定场景做提前剪枝

if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
}

如果请求的资源数量,甚至都大于资源总数量了,说明这个协程心里没数。。。。就算我现在把所有初始化的资源都拿回来,也喂不饱你呀!!!那能怎么办,我就不烦劳后面流程处理了,直接等你的 context 什么时候 Done,给你返回 context 的错误就行了,同时先解个锁,别耽误别的 goroutine 拿资源。

第三步:资源是够的,只是现在没有,那就把当前goroutine加到排队的队伍里

ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

这里 ready 结构是个空结构体的 channel,仅仅是为了实现协程间通信,通知什么时候资源 ready,建立一个属于这个 goroutine 的 waiter,然后塞到 Weighted 结构的等待队列 waiters 里。

搞定了以后直接解锁,因为你已经来排队了,手续处理完成,以后的路有别的通知机制保证,就没必要在这儿拿着锁阻塞新来的 goroutine 了,人家也得排队。

第四步:排队等待

select {
    case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                    // Acquired the semaphore after we were canceled.  Rather than trying to
                    // fix up the queue, just pretend we didn't notice the cancelation.
                    err = nil
            default:
                    isFront := s.waiters.Front() == elem
                    s.waiters.Remove(elem)
                    // If we're at the front and there're extra tokens left, notify other waiters.
                    if isFront && s.size > s.cur {
                            s.notifyWaiters()
                    }
            }
            s.mu.Unlock()
            return err
    case <-ready:
            return nil
    }

一个 select 语句,只看两种情况:1. 这个 goroutine 的 context 超时了;2. 拿到了资源,皆大欢喜。

重点在于 ctx.Done 分支里 default 的处理。我们可以看到,如果是超时了,此时还没拿到资源,首先会把当前 goroutine 从 waiters 等待队列里移除(合情合理,你既然因为自己的原因做不了主,没法继续等待了,就别耽误别人事了)。

然后接着判断,若这个 goroutine 同时也是排在最前的 goroutine,而且恰好现在有资源了,就赶紧通知队里的 goroutine 们,伙计们,现在有资源了,赶紧来拿。我们来看看这个 notifyWaiters 干了什么:

func (s *Weighted) notifyWaiters() {
	for {
		next := s.waiters.Front()
		if next == nil {
			break // No more waiters blocked.
		}
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			// Not enough tokens for the next waiter.  We could keep going (to try to
			// find a waiter with a smaller request), but under load that could cause
			// starvation for large requests; instead, we leave all remaining waiters
			// blocked.
			//
			// Consider a semaphore used as a read-write lock, with N tokens, N
			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
			// of the readers.  If we allow the readers to jump ahead in the queue,
			// the writer will starve — there is always one token available for every
			// reader.
			break
		}
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

其实很简单,遍历 waiters 这个等待队列,拿到排队最前的 waiter,判断资源够不够,如果够了,增加 cur 变量,资源给你,然后把你从等待队列里移出去,再 close ready 那个goroutine 就行,算是通知一下。

重点部分在于,如果资源不够怎么办?

想象一下现在的处境,Weighted 这个 semaphore 的确有资源,而目前要处理的这个 goroutine 的的确确就是排队最靠前的,而且人家也没狮子大开口,要比你总 size 还大的资源。但是,但是,好巧不巧,现在你要的数量,比我手上有的少。。。。

很无奈,那怎么办呢?

无非两种解法:

扩展库实际选用的是第 2 种策略,即一定要满足排在最前面的 goroutine,这里的考虑在注释里有提到,如果直接继续看后面的 goroutine 够不够,优先满足后面的,在一些情况下会饿死有大资源要求的 goroutine,设计上不希望这样的情况发生。

简单说:要的多不是错,既然你排第一,目前货不多,那就大家一起阻塞等待,保障你的权利。

Release

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	if s.cur &lt; 0 {
		s.mu.Unlock()
		panic("semaphore: released more than held")
	}
	s.notifyWaiters()
	s.mu.Unlock()
}

Release 这里的实现非常简单,一把锁保障不出现并发,然后将 cur 减去 n 即可,说明此时又有 n 个资源回到了货仓。然后和上面 Acquire 一样,调用 notifyWaiters,叫排队第一的哥们(哦不,是 goroutine)来领东西了。

TryAcquire

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

其实就是 Acquire 方法的 fast path,只是返回了个 bool,标识是否获取成功。

总结

今天我们了解了扩展库 semaphore 对于信号量的封装实现,整体代码加上注释也才 100 多行,是非常好的学习材料,建议大家有空了对着源码再过一遍。Acquire 和 Release 的实现都很符合直觉。

其实,我们使用 buffered channel 其实也可以模拟出来 n 个信号量的效果,但就不具备 semaphore Weighted 这套实现里面,一次获取多个资源的能力了。

以上就是Golang信号量设计实现示例详解的详细内容,更多关于Go信号量设计的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文