Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > go单机锁

golang中单机锁的具体实现详解

作者:MelonTe

这篇文章主要为大家详细介绍了golang中单机锁的具体实现的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

1、锁的概念引入

首先,为什么需要锁?

在并发编程中,多个线程或进程可能同时访问和修改同一个共享资源(例如变量、数据结构、文件)等,若不引入合适的同步机制,会引发以下问题:

因此,我们需要一把锁,来保证同一时间只有一个人能写数据,确保共享资源在并发访问下的正确性和一致性。

在这里,引入两种常见的并发控制处理机制,即乐观锁与悲观锁:

乐观锁:假定在并发操作中,资源的抢占并不是很激烈,数据被修改的可能性不是很大,那这时候就不需要对共享资源区进行加锁再操作,而是先修改了数据,最终来判断数据有没有被修改,没有被修改则提交修改指,否则重试。

悲观锁:与乐观锁相反,它假设场景的资源竞争激烈,对共享资源区的访问必须要求持有锁。

针对不同的场景需要采取因地制宜的策略,比较乐观锁与悲观所,它们的优缺点显而易见:

策略优点缺点
乐观锁不需要实际上锁,性能高若冲突时,需要重新进行操作,多次重试可能会导致性能下降明显
悲观锁访问数据一定需要持有锁,保证并发场景下的数据正确性加锁期间,其他等待锁的线程需要被阻塞,性能低

2、Sync.Mutex

Go对单机锁的实现,考虑了实际环境中协程对资源竞争程度的变化,制定了一套锁升级的过程。具体方案如下:

从乐观转向悲观的判定规则如下,满足其中之一即发生转变:

除此之外,为了防止被阻塞的协程等待过长时间也没有获取到锁,导致用户的整体体验下降,引入了饥饿的概念:

饥饿模式与正常模式的转变规则如下:

普通模式->饥饿模式:存在阻塞的协程,阻塞时间超过1ms

饥饿模式->普通模式:阻塞队列清空,亦或者获得锁的协程的等待时间小于1ms,则恢复

接下来步入源码,观看具体的实现。

2.1、数据结构

位于包sync/mutex.go中,对锁的定义如下:

type Mutex struct {
    state int32
    sema  uint32
}

将state看作一个二进制字符串,它存储信息的规则如下:

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
    starvationThresholdNs = 1e6 //饥饿阈值
)

2.2、获取锁Lock()

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

尝试直接通过CAS操作直接获取锁,若成功则返回,否则说明锁被获取,步入LockSlow。

2.3、LockSlow()

源码较长,进行拆分讲解:

var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state

(1)定义了基本的常量,含义如下:

for {
        //处于上锁状态,并且不处于饥饿状态中,并且当前的协程允许继续自旋下去
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
 
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        //...
    }

(2)进入尝试获取锁的循环中,两个if表示:

这里存在的唯一疑惑为,为什么要将awoke标识为true?

首先,因为当前锁并非处于饥饿模式,因此当前的抢占锁的模式是不公平的,若当前锁的阻塞队列还没有被唤醒的协程,那就要求不要唤醒了,尝试让当前正在尝试的协程获取到锁,避免唤醒协程进行资源竞争。

for {
        //...
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }
        //...
}        

(3)进行状态更新:

当协程从步骤2走出来时,只能说明它位于以下两个状态之一:

不论如何,都需要进行一些状态的更新,为接下来的打算做准备。

用new存储一个锁即将要进入的新状态信息,更新规则:

虽然更新的有点多,但是可以归纳为

(4)尝试更新信息:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            //...
        } else {
            old = m.state
        }

接下来尝试将new更新进state,若更新失败,说明当前有另一个协程介入了,为了防止数据的一致性丢失,要全部重来一次。

(5)状态更新成功,具体判断是要沉睡还是获取锁成功:

步入步骤4的if主支中,此时有两个状态:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
    //...
        } else {
            //...
        }

因为当前状态,可能是锁释放了,检查锁更新前是否已经被释放了并且不是饥饿模式,若是那说明获取锁成功了,函数结束了。

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 2)
    //....
        } else {
            //...
        }

否则,说明当前协程要进入阻塞态了,记录一下开始阻塞的时间,用于醒来是判断是否饥饿。然后进入阻塞沉睡中。

(6)若步骤5进入阻塞,则被唤醒后:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 2)
                //唤醒
              starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
    //若锁处于饥饿模式
            if old&mutexStarving != 0 {
                //锁的异常处理
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                //将要更新的信号量
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
    
    //....
        } else {
            //...
        }

从阻塞中唤醒,首先计算一些协程的阻塞时间,以及当前的最新锁状态。

若锁处于饥饿模式:那么当前协程将直接获取锁,当前协程是因为饥饿模式被唤醒的,不存在其他协程抢占锁。于是更新信号量,将记录阻塞协程数-1,将锁的上锁态置1。若当前从饥饿模式唤醒的协程,等待时间已经不到1ms了或者是最后一个等待的协程,那么将将锁从饥饿模式转化为正常模式。至此,获取成功,退出函数。

否则,只是普通的随机唤醒,于是开始尝试进行抢占,回到步骤1。

2.4、释放锁Unlock()

func (m *Mutex) Unlock() {
    //直接释放锁
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

通过原子操作,直接将锁的mutexLocked标识置为0。若置0后,锁的状态不为0,那就说明存在需要获取锁的协程,步入unlockSlow。

2.5、unlockSlow()

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 2)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 2)
    }
}

(1)首先进行了异常状态处理,若释放了一个已经释放了到锁,那么直接fatal,程序终止。

if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }

(2)若锁不处于饥饿状态:

否则,处于饥饿状态,唤醒等待最久的协程。

3、Sync.RWMutex

对于共享资源区的操作,可以划分为读与写两大类。假设在一个场景中,对共享资源区继续读的操作远大于写的操作,如果每个协程的读操作都需要获取互斥锁,这带来的性能损耗是非常大的。

RWMutex是一个可以运用在读操作>写操作中的提高性能的锁,可以将它视为由一个读锁与一个写锁构成。其运作规则具体如下:

3.1、数据结构

type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30 //最大的读协程数量

3.2、读锁流程RLock()

func (rw *RWMutex) RLock() {
    if rw.readerCount.Add(1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

对readerCount+1,表示新加入一个读协程。若结果<0,说明当前锁正在被写协程占据,令当前的读协程阻塞。

3.3、读释放锁流程RUnlock()

func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
}

对readerCount-1,表示减少一个读协程。若结果<0,说明当前锁正在被写协程占据,步入runlockslow。

3.4、rUnlockSlow()

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        fatal("sync: RUnlock of unlocked RWMutex")
    }
    if rw.readerWait.Add(-1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

首先进行错误处理,若发现当前协程为占用过读锁,或者读流程的协程数量上限,系统出现异常,fatal。

否则,对readerWait-1,若结果为0,说明当前协程是最后一个介入读锁流程的协程,此时需要释放一个写锁。

3.5、写锁流程Lock()

func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && rw.readerWait.Add(r) != 0 {
        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    }
}

首先尝试获取写锁,若获取成功,需要将readerCount-最大读协程数,表示现在锁被读协程占据。

r表示处于读流程的协程数量,若r不为0,那么就将readerWait加上r,等这些读协程都读取完毕,再去写。将这个写协程阻塞。(读写锁并非读、写公平,读协程优先。)

3.6、写释放锁流程Unlock()

func (rw *RWMutex) Unlock() {
    // Announce to readers there is no active writer.
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        fatal("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
}

重新将readerCount置为正常指,表示释放了写锁。若读协程超过最大上限,则异常。

然后唤醒所有阻塞的读协程。(读协程优先)

解锁。

以上就是golang中单机锁的具体实现详解的详细内容,更多关于go单机锁的资料请关注脚本之家其

您可能感兴趣的文章:
阅读全文