目录
  • 引言
  • Mutex结构
  • 饥饿模式和正常模式
    • 正常模式
    • 饥饿模式
    • 状态的切换
  • 加锁和解锁
    • 加锁
      • 自旋
      • 计算锁的新状态
      • 更新锁状态
    • 解锁
    • 可能遇到的问题
      • 锁拷贝
        • panic导致没有unlock

        引言

        Golang的并发编程令人着迷,使用轻量的协程、基于CSP的channel、简单的go func()就可以开始并发编程,在并发编程中,往往离不开锁的概念。

        本文介绍了常用的同步原语 sync.Mutex,同时从源码剖析它的结构与实现原理,最后简单介绍了mutex在日常使用中可能遇到的问题,希望大家读有所获。

        Mutex结构

        Mutex运行时数据结构位于sync/mutex.go

        type Mutex struct {
           state int32
           sema  uint32
        }
        

        其中state表示当前互斥锁的状态,sema表示 控制锁状态的信号量.

        互斥锁的状态定义在常量中:

        const (
           mutexLocked = 1 << iota // 1 ,处于锁定状态; 2^0
           mutexWoken // 2 ;从正常模式被从唤醒;  2^1
           mutexStarving // 4 ;处于饥饿状态;    2^2
           mutexWaiterShift = iota // 3 ;获得互斥锁上等待的Goroutine个数需要左移的位数: 1 << mutexWaiterShift
           starvationThresholdNs = 1e6 // 锁进入饥饿状态的等待时间
        )
        

        0即其他状态。

        sema是一个组合,低三位分别表示锁的三种状态,高29位表示正在等待互斥锁释放的gorountine个数,和Java表示线程池状态那部分有点类似

        Golang Mutex互斥锁深入理解

        一个mutex对象仅占用8个字节,让人不禁感叹其设计的巧妙

        Golang Mutex互斥锁深入理解

        饥饿模式和正常模式

        正常模式

        在正常模式下,等待的协程会按照先进先出的顺序得到锁 在正常模式下,刚被唤醒的goroutine与新创建的goroutine竞争时,大概率无法获得锁。

        饥饿模式

        为了避免正常模式下,goroutine被“饿死”的情况,go在1.19版本引入了饥饿模式,保证了Mutex的公平性

        在饥饿模式中,互斥锁会直接交给等待队列最前面的goroutine。新的goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。

        状态的切换

        在正常模式下,一旦Goroutine超过1ms没有获取到锁,它就会将当前互斥锁切换饥饿模式

        如果一个goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

        加锁和解锁

        加锁

        func (m *Mutex) Lock() {
           // Fast path: grab unlocked mutex.
           if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
              return
           }
           // 原注释: Slow path (outlined so that the fast path can be inlined)
           // 将
           m.lockSlow()
        }
        

        可以看到,当前互斥锁的状态为0时,尝试将当前锁状态设置为更新锁定状态,且这些操作是原子的。

        若当前状态不为0,则进入lockSlow方法
        先定义了几个参数

        var waitStartTime int64
        starving := false // 
        awoke := false
        iter := 0
        old := m.state
        

        随后进入一个很大的for循环,让我们来逐步分析

        自旋

        for {
             // 1 && 2 
           if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
              //  3. 
              if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                 awoke = true
              }
              runtime_doSpin()
              iter++
              old = m.state
              continue
           }
        

        old&(mutexLocked|mutexStarving) == mutexLocked

        当且仅当当前锁状态为mutexLocked时,表达式为true

        runtime_canSpin(iter) 是否满足自旋条件

        • 运行在拥有多个CPU的机器上;
        • 当前Goroutine为了获取该锁进入自旋的次数小于四次;
        • 当前机器上至少存在一个正在运行的处理器 P,并且处理的运行队列为空;

        如果当前状态下自旋是合理的,将awoke置为true,同时设置锁状态为mutexWoken,进入自旋逻辑

        runtime_doSpin()会执行30次PAUSE指令,并且仅占用CPU资源 代码位于:runtime\asm_amd64.s +567

        //go:linkname sync_runtime_doSpin sync.runtime_doSpin
        //go:nosplit
        func sync_runtime_doSpin() {
           procyield(active_spin_cnt)
        }
        
        TEXT runtime·procyield(SB),NOSPLIT,$0-0
            MOVL    cycles+0(FP), AX
        again:
            PAUSE 
            SUBL    $1, AX
            JNZ again
            RET
        

        计算锁的新状态

        停止了自旋后,

        new := old
        // 1. 
        if old&mutexStarving == 0 {
           new |= mutexLocked
        }
        // 2.
        if old&(mutexLocked|mutexStarving) != 0 {
           new += 1 << mutexWaiterShift
        }
        // 3 && 4. 
        if starving && old&mutexLocked != 0 {
           new |= mutexStarving
        }
        // 5. 
        if awoke {
           if new&mutexWoken == 0 {
              throw("sync: inconsistent mutex state")
           }
           new &^= mutexWoken
        }
        
        • old&mutexStarving == 0 表明原来不是饥饿模式。如果是饥饿模式的话,其他goroutine不会执行接下来的代码,直接进入等待队列队尾
        • 如果原来是 mutexLocked 或者 mutexStarving模式,waiterCounts数加一
        • 如果被标记为饥饿状态,且锁状态为mutexLocked的话,设置锁的新状态为饥饿状态。
        • 被标记为饥饿状态的前提是 被唤醒过且抢锁失败
        • 计算新状态

        更新锁状态

        // 1.
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
              if old&(mutexLocked|mutexStarving) == 0 {
                 break // locked the mutex with CAS
              }
              // 2. 
              queueLifo := waitStartTime != 0
              if waitStartTime == 0 {
                 waitStartTime = runtime_nanotime()
              }
              // 3.
              runtime_SemacquireMutex(&m.sema, queueLifo, 1)
              // 4.
              starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
              old = m.state
              // 5.
              if old&mutexStarving != 0 {
                 /
                 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                 }
                 delta := int32(mutexLocked - 1<<mutexWaiterShift)
                 if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                 }
                 atomic.AddInt32(&m.state, delta)
                 break
              }
              awoke = true
              iter = 0
           } else {
              old = m.state
           }
        }
        
        • 尝试将锁状态设置为new 。这里设置成功不代表上锁成功,有可能new不为mutexLocked 或者是waiterCount数量的改变
        • waitStartTime不为0 说明当前goroutine已经等待过了,将当前goroutine放到等待队列的队头
        • 走到这里,会调用runtime_SemacquireMutex 方法使当前协程阻塞,runtime_SemacquireMutex方法中会不断尝试获得锁,并会陷入休眠 等待信号量释放。
        • 当前协程可以获得信号量,从runtime_SemacquireMutex方法中返回。此时协程会去更新starving标志位:如果当前starving标志位为true或者等待时间超过starvationThresholdNs ,将starving置为true

        之后会按照饥饿模式与正常模式,走不同的逻辑

        • – 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;  
        • – 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;

        解锁

        Golang Mutex互斥锁深入理解

        func (m *Mutex) Unlock() {
           // 1.
           new := atomic.AddInt32(&m.state, -mutexLocked)
           if new != 0 {
              // 2. 
              m.unlockSlow(new)
           }
        }
        
        • 将锁状态的值增加 -mutexLocked 。如果新状态不等于0,进入unlockSlow方法
        func (m *Mutex) unlockSlow(new int32) {
            // 1. 
           if (new+mutexLocked)&mutexLocked == 0 {
              throw("sync: unlock of unlocked mutex")
           }
           if new&mutexStarving == 0 {
              old := new
              for {
              // 2.
                 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                 }
                 // 2.1.
                 new = (old - 1<<mutexWaiterShift) | mutexWoken
                 if atomic.CompareAndSwapInt32(&m.state, old, new) {
                 // 2.2.
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                 }
                 old = m.state
              }
           } else {
           // 3.
              runtime_Semrelease(&m.sema, true, 1)
           }
        }
        

        1.new+mutexLocked代表将锁置为1,如果两个状态& 不为0,则说明重复解锁.如果重复解锁则抛出panic

        2. 如果等待者数量等于0,或者锁的状态已经变为mutexWoken、mutexStarving、mutexStarving,则直接返回

        • 将waiterCount数量-1,尝试选择一个goroutine唤醒
        • 尝试更新锁状态,如果更新锁状态成功,则唤醒队尾的一个gorountine

        3. 如果不满足 2的判断条件,则进入饥饿模式,同时交出锁的使用权

        可能遇到的问题

        锁拷贝

        mu1 := &sync.Mutex{}
        mu1.Lock()
        mu2 := mu1
        mu2.Unlock()
        

        此时mu2能够正常解锁,那么我们再试试解锁mu1

        mu1 := &sync.Mutex{}
        mu1.Lock()
        mu2 := mu1
        mu2.Unlock()
        mu1.Unlock()
        

        Golang Mutex互斥锁深入理解

        可以看到发生了error

        panic导致没有unlock

        当lock()之后,可能由于代码问题导致程序发生了panic,那么mutex无法被及时unlock(),由于其他协程还在等待锁,此时可能触发死锁

        func TestWithLock() {
           nums := 100
           wg := &sync.WaitGroup{}
           safeSlice := SafeSlice{
              s:    []int{},
              lock: new(sync.RWMutex),
           }
           i := 0
           for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
              wg.Add(1)
              go func() {
                 defer func() {
                    if r := recover(); r != nil {
                       log.Println("recover")
                    }
                    wg.Done()
                 }()
                 safeSlice.lock.Lock()
                 safeSlice.s = append(safeSlice.s, i)
                 if i == 98{
                    panic("123")
                 }
                 i++
                 safeSlice.lock.Unlock()
              }()
           }
           wg.Wait()
           log.Println(len(safeSlice.s))
        }
        

        Golang Mutex互斥锁深入理解

        修改:

        func TestWithLock() {
           nums := 100
           wg := &sync.WaitGroup{}
           safeSlice := SafeSlice{
              s:    []int{},
              lock: new(sync.RWMutex),
           }
           i := 0
           for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
              wg.Add(1)
              go func() {
                 defer func() {
                    if r := recover(); r != nil {
                    }
                    safeSlice.lock.Unlock()
                    wg.Done()
                 }()
                 safeSlice.lock.Lock()
                 safeSlice.s = append(safeSlice.s, i)
                 if i == 98{
                    panic("123")
                 }
                 i++
              }()
           }
           wg.Wait()
           log.Println(len(safeSlice.s))
        }

        以上就是Golang Mutex互斥锁深入理解的详细内容,更多关于Golang Mutex互斥锁的资料请关注其它相关文章!

        声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。