Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go语言 WaitGroup并发同步

Go语言中WaitGroup并发同步的实现

作者:王码码2035哦

本文主要介绍了Go语言中WaitGroup并发同步的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

写了十几年代码的Go后端老兵。今天聊聊WaitGroup,这个并发同步的小能手。

一、为什么需要WaitGroup

上周我们重构了一个数据导入功能,需要并发处理多个文件。但程序总是提前退出,有些文件还没处理完就结束了。

这就是没有做好并发同步的后果。WaitGroup可以帮我们等待所有goroutine完成。

二、WaitGroup的基本用法

1. 基础示例

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1) // 增加计数器
        go func(id int) {
            defer wg.Done() // 减少计数器
            fmt.Printf("Worker %d starting\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d done\n", id)
        }(i)
    }
    wg.Wait() // 等待所有goroutine完成
    fmt.Println("All workers done")
}

2. 错误处理

func processFiles(files []string) error {
    var wg sync.WaitGroup
    errChan := make(chan error, len(files))
    for _, file := range files {
        wg.Add(1)
        go func(f string) {
            defer wg.Done()
            if err := processFile(f); err != nil {
                errChan <- err
            }
        }(file)
    }
    wg.Wait()
    close(errChan)
    for err := range errChan {
        if err != nil {
            return err
        }
    }
    return nil
}

三、WaitGroup的实战技巧

1. 批量处理

func batchProcess(items []Item, batchSize int) {
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, batchSize) // 限制并发数
    for _, item := range items {
        wg.Add(1)
        semaphore <- struct{}{} // 获取信号量
        go func(i Item) {
            defer wg.Done()
            defer func() { <-semaphore }() // 释放信号量
            process(i)
        }(item)
    }
    wg.Wait()
}

2. 超时控制

func processWithTimeout(items []Item, timeout time.Duration) error {
    var wg sync.WaitGroup
    done := make(chan struct{})
    go func() {
        for _, item := range items {
            wg.Add(1)
            go func(i Item) {
                defer wg.Done()
                process(i)
            }(item)
        }
        wg.Wait()
        close(done)
    }()
    select {
    case <-done:
        return nil
    case <-time.After(timeout):
        return fmt.Errorf("timeout")
    }
}

3. 动态添加任务

func crawler(urls []string) {
    var wg sync.WaitGroup
    urlChan := make(chan string, 100)
    // 启动固定数量的worker
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range urlChan {
                newUrls := fetch(url)
                for _, u := range newUrls {
                    urlChan <- u // 动态添加新URL
                }
            }
        }()
    }
    // 发送初始URL
    for _, url := range urls {
        urlChan <- url
    }
    close(urlChan)
    wg.Wait()
}

四、WaitGroup的常见陷阱

1. Add和Done不匹配

// 错误:在goroutine内部调用Add
for i := 0; i < 5; i++ {
    go func() {
        wg.Add(1) // 错误!可能还没Add就Wait了
        defer wg.Done()
        // ...
    }()
}
wg.Wait()
// 正确:在启动goroutine之前Add
for i := 0; i < 5; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        // ...
    }()
}
wg.Wait()

2. 复制WaitGroup

// 错误:WaitGroup被复制
func wrong(wg sync.WaitGroup) { // 值传递,复制了WaitGroup
    wg.Done()
}
// 正确:传递指针
func right(wg *sync.WaitGroup) {
    wg.Done()
}

3. 重复调用Done

// 错误:可能调用多次Done
func bad() {
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := doSomething(); err != nil {
            wg.Done() // 重复调用!
            return
        }
    }()
}

// 正确:只调用一次
func good() {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doSomething()
    }()
}

五、WaitGroup vs Channel

场景推荐方案
等待多个goroutine完成WaitGroup
传递数据Channel
控制并发数Channel(信号量)
错误处理Channel + WaitGroup

六、性能优化

1. 避免过度并发

// 不好的做法:无限制并发
for _, item := range items {
    wg.Add(1)
    go func(i Item) {
        defer wg.Done()
        process(i)
    }(item)
}
// 好的做法:限制并发数
semaphore := make(chan struct{}, 10)
for _, item := range items {
    wg.Add(1)
    semaphore <- struct{}{}
    go func(i Item) {
        defer wg.Done()
        defer func() { <-semaphore }()
        process(i)
    }(item)
}

2. 复用goroutine

// 使用worker pool
func workerPool(jobs []Job, workers int) {
    var wg sync.WaitGroup
    jobChan := make(chan Job, len(jobs))
    
    // 启动固定数量的worker
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobChan {
                process(job)
            }
        }()
    }
    
    // 发送任务
    for _, job := range jobs {
        jobChan <- job
    }
    close(jobChan)
    
    wg.Wait()
}

七、总结

WaitGroup是并发编程的必备工具,用好它可以:

记住:能跑就行,别折腾。但该用WaitGroup的时候,一定要用对。

到此这篇关于Go语言中WaitGroup并发同步的实现的文章就介绍到这了,更多相关Go语言 WaitGroup并发同步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文