目录
  • 开篇
  • 信号量
  • semaphore 扩展库实现
    • Acquire
    • Release
    • TryAcquire
  • 总结

    开篇

    在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema:

    type Mutex struct {
        state int32 
        sema  uint32
    }
    
    • state 代表互斥锁的状态,比如是否被锁定;
    • sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。

    这个 sema 就是 semaphore 信号量的意思。Golang 协程之间的抢锁,实际上争抢给Locked赋值的权利,能给 Locked 置为1,就说明抢锁成功。抢不到就阻塞等待 sema 信号量,一旦持有锁的协程解锁,那么等待的协程会依次被唤醒。

    有意思的是,虽然 semaphore 在锁的实现中起到了至关重要的作用,Golang 对信号量的实现却是隐藏在 runtime 中,并没有包含到标准库里来,在 src 源码中我们可以看到底层依赖的信号量相关函数。

    // defined in package runtime
    // Semacquire waits until *s > 0 and then atomically decrements it.
    // It is intended as a simple sleep primitive for use by the synchronization
    // library and should not be used directly.
    func runtime_Semacquire(s *uint32)
    // Semrelease atomically increments *s and notifies a waiting goroutine
    // if one is blocked in Semacquire.
    // It is intended as a simple wakeup primitive for use by the synchronization
    // library and should not be used directly.
    // If handoff is true, pass count directly to the first waiter.
    // skipframes is the number of frames to omit during tracing, counting from
    // runtime_Semrelease's caller.
    func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
    
    • runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻将 s 减去 1【原子操作】;
    • runtime_Semrelease:将 s 增加 1,然后通知一个阻塞在 runtime_Semacquire 的 goroutine【原子操作】。

    两个原子操作,一个 acquire,一个 release,其实就代表了对资源的获取和释放。Mutex 作为 sync 包的核心,支撑了 RWMutex,channel,singleflight 等多个并发控制的能力,而对信号量的管理又是 Mutex 的基础。

    虽然源码看不到,但 Golang 其实在扩展库 golang.org/x/sync/semaphore 也提供了一套信号量的实现,我们可以由此来参考一下,理解 semaphore 的实现思路。

    信号量

    在看源码之前,我们先理清楚【信号量】设计背后的场景和原理。

    信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

    在 Mutex 依赖的信号量机制中我们可以看到,这里本质就是依赖 sema 一个 uint32 的变量 + 原子操作来实现并发控制能力。当 goroutine 完成对信号量等待时,该变量 -1,当 goroutine 完成对信号量的释放时,该变量 +1。

    如果一个新的 goroutine 发现信号量不大于 0,说明资源暂时没有,就得阻塞等待。直到信号量 > 0,此时的语义是有新的资源,该goroutine就会结束等待,完成对信号量的 -1 并返回。注意我们上面有提到,runtime 支持的两个方法都是原子性的,不用担心两个同时在等待的 goroutine 同时抢占同一份资源。

    典型的信号量场景是【图书馆借书】。假设学校图书馆某热门书籍现在只有 100 本存货,但是上万学生都想借阅,怎么办?

    直接买一万本书是非常简单粗暴的解法,但资源有限,这不是长久之计。

    常见的解决方案很简单:学生们先登记,一个一个来。我们先给 100 个同学发出,剩下的你们继续等,等到什么时候借书的同学看完了,把书还回来了,就给排队等待的同学们发放。同时,为了避免超发,每发一个,都需要在维护的记录里将【余量】减去 1,每还回来一个,就把【余量】加上 1。

    runtime_Semacquire 就是排队等待借书,runtime_Semrelease 就是看完了把书归还给图书馆。

    另外需要注意,虽然我们上面举例的增加/减小的粒度都是 1,但这本质上只是一种场景,事实上就算是图书馆借书,也完全有可能出现一个人同时借了两本一模一样的书。所以,信号量的设计需要支持 N 个资源的获取和释放。

    所以,我们对于 acquire 和 release 两种操作的语义如下:

    • release: 将信号量增加 n【保证原子性】;
    • acquire: 若信号量 < n,阻塞等待,直到信号量 >= n,此时将信号量的值减去 n【保证原子性】。

    semaphore 扩展库实现

    这里我们结合golang.org/x/sync/semaphore 源码来看看怎样设计出来我们上面提到的信号量结构。

    // NewWeighted creates a new weighted semaphore with the given
    // maximum combined weight for concurrent access.
    func NewWeighted(n int64) *Weighted {
    	w := &Weighted{size: n}
    	return w
    }
    // Weighted provides a way to bound concurrent access to a resource.
    // The callers can request access with a given weight.
    type Weighted struct {
    	size    int64       // 最大资源数
    	cur     int64       // 当前已被使用的资源
    	mu      sync.Mutex
    	waiters list.List   // 等待队列
    }
    

    有意思的是,虽然包名是 semaphore,但是扩展库里真正给【信号量结构体】定义的名称是 Weighted。从上面的定义我们可以看到,传入初始资源个数 n(对应 size),就可以生成一个 Weighted 信号量结构。

    Golang信号量设计实现示例详解

    Weighted 提供了三个方法来实现对信号量机制的支持:

    • Acquire

    对应上面我们提到的 acquire 语义,注意我们提到过,抽象的来讲,acquire 成功与否其实不太看返回值,而是只要获取不了就一直阻塞,如果返回了,就意味着获取到了。

    但在 Golang 实现当中,我们肯定不希望,如果发生了异常 case,导致一直阻塞在这里。所以你可以看到 Acquire 的入参里有个 context.Context,借用 context 的上下文控制能力,你可以对此进行 cancel, 可以设置 timeout 等待超时,就能对 acquire 行为进行更多约束。

    所以,acquire 之后我们仍然需要检查返回值 error,如果为 nil,代表正常获取了资源。否则可能是 context 已经不合法了。

    • Release

    跟上面提到的 release 语义完全一致,传入你要释放的资源数 n,保证原子性地增加信号量。

    • TryAcquire

    这里其实跟 sync 包中的各类 TryXXX 函数定位很像。并发的机制中大都包含 fast path 和 slow path,比如首个 goroutine 先来 acquire,那么一定是能拿到的,后续再来请求的 goroutine 由于慢了一步,只能走 slow path 进行等待,自旋等操作。sync 包中绝大部分精华,都在于 slow path 的处理。fast path 大多是一个基于 atomic 包的原子操作,比如 CAS 就可以解决。

    TryAcquire 跟 Acquire 的区别在于,虽然也是要资源,但是不等待。有了我就获取,就减信号量,返回 trye。但是如果目前还没有,我不会阻塞在这里,而是直接返回 false。

    下面我们逐个方法看看,Weighted 是怎样实现的。

    Acquire

    // Acquire acquires the semaphore with a weight of n, blocking until resources
    // are available or ctx is done. On success, returns nil. On failure, returns
    // ctx.Err() and leaves the semaphore unchanged.
    //
    // If ctx is already done, Acquire may still succeed without blocking.
    func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    	s.mu.Lock()
    	if s.size-s.cur >= n && s.waiters.Len() == 0 {
    		s.cur += n
    		s.mu.Unlock()
    		return nil
    	}
    	if n > s.size {
    		// Don't make other Acquire calls block on one that's doomed to fail.
    		s.mu.Unlock()
    		<-ctx.Done()
    		return ctx.Err()
    	}
    	ready := make(chan struct{})
    	w := waiter{n: n, ready: ready}
    	elem := s.waiters.PushBack(w)
    	s.mu.Unlock()
    	select {
    	case <-ctx.Done():
    		err := ctx.Err()
    		s.mu.Lock()
    		select {
    		case <-ready:
    			// Acquired the semaphore after we were canceled.  Rather than trying to
    			// fix up the queue, just pretend we didn't notice the cancelation.
    			err = nil
    		default:
    			isFront := s.waiters.Front() == elem
    			s.waiters.Remove(elem)
    			// If we're at the front and there're extra tokens left, notify other waiters.
    			if isFront && s.size > s.cur {
    				s.notifyWaiters()
    			}
    		}
    		s.mu.Unlock()
    		return err
    	case <-ready:
    		return nil
    	}
    }
    

    在阅读之前回忆一下上面 Weighted 结构的定义,注意 Weighted 并没有维护一个变量用来表示【当前剩余的资源】,这一点是通过 size(初始化的时候设置,表示总资源数)减去 cur(当前已被使用的资源),二者作差得到的。

    我们来拆解一下上面这段代码:

    第一步:这是常规意义上的 fast path

    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
            s.cur += n
            s.mu.Unlock()
            return nil
    }
    
    • 先上锁,保证并发安全;
    • 校验如果 size – cur >= n,代表剩余的资源是足够,同时 waiters 这个等待队列为空,代表没有别的协程在等待;
    • 此时就没什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 个资源,然后解锁返回,很直接。

    第二步:针对特定场景做提前剪枝

    if n > s.size {
            // Don't make other Acquire calls block on one that's doomed to fail.
            s.mu.Unlock()
            <-ctx.Done()
            return ctx.Err()
    }
    

    如果请求的资源数量,甚至都大于资源总数量了,说明这个协程心里没数。。。。就算我现在把所有初始化的资源都拿回来,也喂不饱你呀!!!那能怎么办,我就不烦劳后面流程处理了,直接等你的 context 什么时候 Done,给你返回 context 的错误就行了,同时先解个锁,别耽误别的 goroutine 拿资源。

    第三步:资源是够的,只是现在没有,那就把当前goroutine加到排队的队伍里

    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()
    

    这里 ready 结构是个空结构体的 channel,仅仅是为了实现协程间通信,通知什么时候资源 ready,建立一个属于这个 goroutine 的 waiter,然后塞到 Weighted 结构的等待队列 waiters 里。

    搞定了以后直接解锁,因为你已经来排队了,手续处理完成,以后的路有别的通知机制保证,就没必要在这儿拿着锁阻塞新来的 goroutine 了,人家也得排队。

    第四步:排队等待

    select {
        case <-ctx.Done():
                err := ctx.Err()
                s.mu.Lock()
                select {
                case <-ready:
                        // Acquired the semaphore after we were canceled.  Rather than trying to
                        // fix up the queue, just pretend we didn't notice the cancelation.
                        err = nil
                default:
                        isFront := s.waiters.Front() == elem
                        s.waiters.Remove(elem)
                        // If we're at the front and there're extra tokens left, notify other waiters.
                        if isFront && s.size > s.cur {
                                s.notifyWaiters()
                        }
                }
                s.mu.Unlock()
                return err
        case <-ready:
                return nil
        }
    

    一个 select 语句,只看两种情况:1. 这个 goroutine 的 context 超时了;2. 拿到了资源,皆大欢喜。

    重点在于 ctx.Done 分支里 default 的处理。我们可以看到,如果是超时了,此时还没拿到资源,首先会把当前 goroutine 从 waiters 等待队列里移除(合情合理,你既然因为自己的原因做不了主,没法继续等待了,就别耽误别人事了)。

    然后接着判断,若这个 goroutine 同时也是排在最前的 goroutine,而且恰好现在有资源了,就赶紧通知队里的 goroutine 们,伙计们,现在有资源了,赶紧来拿。我们来看看这个 notifyWaiters 干了什么:

    func (s *Weighted) notifyWaiters() {
    	for {
    		next := s.waiters.Front()
    		if next == nil {
    			break // No more waiters blocked.
    		}
    		w := next.Value.(waiter)
    		if s.size-s.cur < w.n {
    			// Not enough tokens for the next waiter.  We could keep going (to try to
    			// find a waiter with a smaller request), but under load that could cause
    			// starvation for large requests; instead, we leave all remaining waiters
    			// blocked.
    			//
    			// Consider a semaphore used as a read-write lock, with N tokens, N
    			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
    			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
    			// of the readers.  If we allow the readers to jump ahead in the queue,
    			// the writer will starve — there is always one token available for every
    			// reader.
    			break
    		}
    		s.cur += w.n
    		s.waiters.Remove(next)
    		close(w.ready)
    	}
    }
    

    其实很简单,遍历 waiters 这个等待队列,拿到排队最前的 waiter,判断资源够不够,如果够了,增加 cur 变量,资源给你,然后把你从等待队列里移出去,再 close ready 那个goroutine 就行,算是通知一下。

    重点部分在于,如果资源不够怎么办?

    想象一下现在的处境,Weighted 这个 semaphore 的确有资源,而目前要处理的这个 goroutine 的的确确就是排队最靠前的,而且人家也没狮子大开口,要比你总 size 还大的资源。但是,但是,好巧不巧,现在你要的数量,比我手上有的少。。。。

    很无奈,那怎么办呢?

    无非两种解法:

    • 我先不管你,反正你要的不够,我先看看你后面那个 goroutine 人家够不够,虽然你现在是排位第一个,但是也得继续等着;
    • 没办法,你排第一,需求我就得满足,所以我们都继续等,等啥时候资源够了就给你。

    扩展库实际选用的是第 2 种策略,即一定要满足排在最前面的 goroutine,这里的考虑在注释里有提到,如果直接继续看后面的 goroutine 够不够,优先满足后面的,在一些情况下会饿死有大资源要求的 goroutine,设计上不希望这样的情况发生。

    简单说:要的多不是错,既然你排第一,目前货不多,那就大家一起阻塞等待,保障你的权利。

    Release

    // Release releases the semaphore with a weight of n.
    func (s *Weighted) Release(n int64) {
    	s.mu.Lock()
    	s.cur -= n
    	if s.cur &lt; 0 {
    		s.mu.Unlock()
    		panic("semaphore: released more than held")
    	}
    	s.notifyWaiters()
    	s.mu.Unlock()
    }
    

    Release 这里的实现非常简单,一把锁保障不出现并发,然后将 cur 减去 n 即可,说明此时又有 n 个资源回到了货仓。然后和上面 Acquire 一样,调用 notifyWaiters,叫排队第一的哥们(哦不,是 goroutine)来领东西了。

    TryAcquire

    // TryAcquire acquires the semaphore with a weight of n without blocking.
    // On success, returns true. On failure, returns false and leaves the semaphore unchanged.
    func (s *Weighted) TryAcquire(n int64) bool {
    	s.mu.Lock()
    	success := s.size-s.cur >= n && s.waiters.Len() == 0
    	if success {
    		s.cur += n
    	}
    	s.mu.Unlock()
    	return success
    }
    

    其实就是 Acquire 方法的 fast path,只是返回了个 bool,标识是否获取成功。

    总结

    今天我们了解了扩展库 semaphore 对于信号量的封装实现,整体代码加上注释也才 100 多行,是非常好的学习材料,建议大家有空了对着源码再过一遍。Acquire 和 Release 的实现都很符合直觉。

    其实,我们使用 buffered channel 其实也可以模拟出来 n 个信号量的效果,但就不具备 semaphore Weighted 这套实现里面,一次获取多个资源的能力了。

    以上就是Golang信号量设计实现示例详解的详细内容,更多关于Go信号量设计的资料请关注其它相关文章!

    声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。