Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > goland sync包

深度剖析golang中的sync包

作者:Clarence Liu

go语言的sync包提供了多种并发控制工具,如Mutex、RWMutex、WaitGroup、Once、Cond和Map等,用于保护共享资源和协调多个goroutine的执行,这些工具的原理和使用方法在文中都有详细介绍,感兴趣的朋友跟随小编一起看看吧

简介

// 示例:Happens-Before关系
var a string
var done bool
func setup() {
    a = "hello, world"  // 写操作A
    done = true         // 写操作B
}
func main() {
    go setup()
    for !done {        // 读操作C
        // 忙等待
    }
    print(a)           // 读操作D
}

关键点

Mutex

func main() {
	var mu sync.Mutex
	mu.Lock()
	defer mu.Unlock()
	mu.TryLock()
}

锁结构如下

type Mutex struct {
    state int32  // 锁状态:包含多个标志位
    sema  uint32 // 信号量:用于阻塞goroutine
}
func (m *Mutex) Lock() {
    // 快速路径:尝试原子操作获取锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // 慢速路径:锁已被持有,需要等待
    m.lockSlow()
}

RWMutex

func (rw *RWMutex) Lock() {
    // 1. 获取互斥锁
    rw.w.Lock()
    // 2. 设置readerCount为负值,阻止新读者
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
    // 3. 等待现有读者
    if r != 0 {
        runtime_Semacquire(&rw.writerSem)
    }
}
func (rw *RWMutex) RLock() {
    // 检查是否有写者(readerCount < 0表示有写者)
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 有写者在等待或正在写,读者必须阻塞
        runtime_Semacquire(&rw.readerSem)
    }
}

WaitGroup

func main() {
	var wg sync.WaitGroup
	wg.Add(1) // 增加一个等待者
	go func() {
		defer wg.Done()
		// 执行逻辑, 逻辑执行完之后, 等待者数量-1
	}()
	wg.Wait() // 等待所有等待者执行完成
}
type WaitGroup struct {
    noCopy noCopy
    // 64位值的高32位是计数器,低32位是等待者数量
    // 64位原子操作需要64位对齐,但32位编译器不能确保这一点
    state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
}
// WaitGroup状态布局(64位)
// 高32位:计数器 (counter)
// 低32位:等待者数量 (waiters)
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    // 更新计数器
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    // 检查状态变化
    v := int32(state >> 32)  // 计数器
    w := uint32(state)       // 等待者数量
    if v > 0 || w == 0 {
        return // 还有工作要做,或者没有等待者
    }
    // 所有工作完成,唤醒等待者
    if *statep != 0 {
        panic("sync: WaitGroup misuse")
    }
    // 唤醒所有等待的goroutine
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

具体

  1. 高32位 = 工作计数器
    wg.Add(5) // 高32位 += 5,表示还有5个工作要做
    wg.Done() // 高32位 -= 1,表示完成了1个工作
  1. 低32位 = 等待者数量
    wg.Wait() // 低32位 += 1,表示1个goroutine开始等待

当调用Wait方法时,也是一个自旋等待的逻辑。配合cas,实现等待所有工作完成。具体细节参考源码,这里只简单介绍下原理

Once

func main() {
	once := sync.Once{}
	once.Do(func() {
		// 这里的逻辑只会执行一次
	})
}
func (o *Once) Do(f func()) {
	if o.done.Load() == 0 {
		o.doSlow(f)
	}
}
func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done.Load() == 0 {
		defer o.done.Store(1)
		f()
	}
}

Cond

func main() {
	cond := sync.NewCond(new(sync.Mutex))
	done := false
	read := func(name string, c *sync.Cond) {
		c.L.Lock()
		if !done {
			c.Wait()
		}
		fmt.Println("start reading: ", name)
		c.L.Unlock()
	}
	write := func(name string, c *sync.Cond) {
		fmt.Println("start writing: ", name)
		c.L.Lock()
		done = true
		c.L.Unlock()
		fmt.Println("wakes all:", name)
		c.Signal()
	}
	go read("read3", cond)
	go read("read1", cond)
	go read("read2", cond)
	write("write1", cond)
	time.Sleep(5 * time.Second)
}

Map

Pool

// 频繁分配和回收的典型特征
func frequentAllocation() {
    pool := sync.Pool{
        New: func() interface{} {
            return make([]byte, 1024)
        },
    }
    // 在循环中重复创建和销毁对象
    for i := 0; i < 1000; i++ {
        // 分配:创建新对象
        // buf := make([]byte, 1024)  // 直接分配内存
        buf := pool.Get().([]byte) // 使用对象池
        // 使用对象
        copy(buf, data[i])
        process(buf)
        // 回收:对象超出作用域,被GC回收
        pool.Put(buf[:0]) // 归还对象(清空)
    }
    // 每次循环都经历:分配→使用→回收
}

到此这篇关于深度剖析golang中的sync包的文章就介绍到这了,更多相关goland sync包内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文