Go语言并发编程之控制并发数量实现实例
作者:程序员Aike
这篇文章主要为大家介绍了Go语言并发编程之控制并发数量实例探究,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
今天我主要分享下Go语言并发编程如何控制并发数量。
适用场景
有一批数据需要并发处理,不能开启协程数量过多,以免服务器资源耗尽或者对服务造成过大压力,需要控制并发数量为N。
代码
话不多说,直接上代码,示例中采用三种方式进行处理。
1、以int数据集为例,并发数量为num;
2、第一种方式,并发函数报错则终止任务执行。后两种方式会等待所有处理任务执行完,再返回是否发生错误。
代码如下:
# utils.go package utils import ( "context" "fmt" "sync" "golang.org/x/sync/errgroup" ) // BatchDeal BatchDeal // TODO int类型待后续修改为泛型T func BatchDeal(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) { ch := make(chan int, num) go func() { select { case <-ctx.Done(): return default: } for _, v := range records { ch <- v } close(ch) }() errCh := make(chan error, len(records)) go func() { goN(num, func(i int) { select { case <-ctx.Done(): errCh <- ctx.Err() return default: } for v := range ch { if er := f(ctx, v); er != nil { errCh <- er } } })() // 处理完关闭errCh close(errCh) }() // 有错误就结束或者关闭errCh后执行 err = <-errCh if err != nil { fmt.Printf("batch deal fail, err=%v", err) return } fmt.Println("batch deal end") return } func goN(n int, fn func(int)) func() { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { fn(i) wg.Done() }(i) } return wg.Wait } // BatchDeal2 BatchDeal2 // TODO int类型待后续修改为泛型T func BatchDeal2(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) { ch := make(chan int, num) go func() { select { case <-ctx.Done(): return default: } for _, v := range records { ch <- v } close(ch) }() err = groupN(ctx, num, func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } for v := range ch { if err := f(ctx, v); err != nil { return err } } return nil }) if err != nil { fmt.Printf("batch deal fail, err=%v", err) return } fmt.Println("batch deal end") return } // groupN n为并发数量 func groupN(ctx context.Context, n int, fn func(context.Context) error) error { group, ctx := errgroup.WithContext(ctx) for i := 0; i < n; i++ { group.Go(func() error { if err := fn(ctx); err != nil { return err } return nil }) } return group.Wait() } // BatchDeal3 BatchDeal3 // TODO int类型待后续修改为泛型T func BatchDeal3(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) { group, ctx := errgroup.WithContext(ctx) // 并发控制channel,并发数量为num ch := make(chan struct{}, num) for _, v := range records { // 元素进channel,并发超过10则阻塞 ch <- struct{}{} vCopy := v group.Go(func() error { // 释放元素 defer func() { <-ch }() if err := f(ctx, vCopy); err != nil { return err } return nil }) } // 等待执行完毕,全部执行完毕才会结束 if err = group.Wait(); err != nil { fmt.Printf("batch deal failed, err=%v", err) } fmt.Println("batch deal end") return } # utils_test.go package utils import ( "context" "fmt" "testing" "time" "github.com/stretchr/testify/assert" ) // TestBatchDeal func TestBatchDeal(t *testing.T) { records := make([]int, 0) for i := 1; i < 100; i++ { records = append(records, i) } num := 10 testAssert := assert.New(t) testF := func(ctx context.Context, i int) error { fmt.Printf("args=%d", i) time.Sleep(time.Duration(i * int(time.Millisecond))) return nil } testFailF := func(ctx context.Context, i int) error { fmt.Printf("args=%d", i) time.Sleep(time.Duration(i * int(time.Millisecond))) var er error if i == 10 { fmt.Printf("error accour, i=%d\n", i) er = fmt.Errorf("err=%d", i) } return er } err := BatchDeal(context.Background(), records, num, testF) testAssert.Nil(err) err2 := BatchDeal(context.Background(), records, num, testFailF) testAssert.ErrorContains(err2, "err") err3 := BatchDeal2(context.Background(), records, num, testF) testAssert.Nil(err3) err4 := BatchDeal2(context.Background(), records, num, testFailF) testAssert.ErrorContains(err4, "err") err5 := BatchDeal3(context.Background(), records, num, testF) testAssert.Nil(err5) err6 := BatchDeal3(context.Background(), records, num, testFailF) testAssert.ErrorContains(err6, "err") }
以上就是Go语言并发编程之控制并发数量实现实例的详细内容,更多关于Go并发控制的资料请关注脚本之家其它相关文章!