golang批量执行任务的通用模板分享
脚本之家 / 编程助手:解决程序员“几乎”所有问题!
脚本之家官方知识库 → 点击立即使用
需求
一个接口调用时,接收到一个列表,十个元素,需要并发执行十个任务,每个任务都要返回执行的结果和异常,然后对返回的结果装填到一个切片列表里,统一返回结果。
需要协程处理的结构体
确定通道数量
一般按入参的需要处理的元素数量为准
初始化通道
1 2 | orderCh := make ( chan Order, taskNum) //接收返回的结果 errCh := make ( chan error , taskNum) //接收返回的异常 |
发起执行
我们使用sync.WaitGroup来监听执行情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | wg := sync.WaitGroup{} for i:= 0 ; i < taskNum; i++ { wg.Add( 1 ) go func () { defer wg.Done() if i == 3 { //模拟当i=3的时候,返回一个异常 err := errors. New ( "there is an error" ) errCh <- err return } //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <- res }() } wg.Wait() //等待所有任务执行完毕 |
使用for-select接收执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | orderList := make ([]Order, taskNum) for i:= 0 ; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append (orderList, order) } case err := <-errCh: //接收errCh if err != nil { return err //看需求,这里设计发现一个错误就直接停止执行,返回错误 } default : fmt. Println ( "done" ) } } //处理完数据,关闭通道 close (orderCh) close (errCh) |
超时问题
任务执行过程中,需要控制每个任务的执行时间,不能超过一定范围,我们用定时器来解决这个问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | timeoutTime := time.Second * 3 //超时时间 taskTimer := time.NewTimer(timeoutTime) //初始化定时器 orderList := make ([]Order, taskNum) for i:= 0 ; i<taskNum; i++ { select { .... case <-taskTimeout.C: //处理超时 err := errors. New ( "task timeout" ) //此处我们认为超时是错误的一种,赋值给了err return ... } //每次执行都需要重置定时器 taskTimer.Reset(timeoutTime) } |
协程panic问题
主程序是无法捕捉协程内的panic,因此如果不手动处理,就会发生协程内panic导致整个程序中止的情况,我们在defer里处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | for i:= 0 ; i < taskNum; i++ { wg.Add( 1 ) go func () { defer func () { wg.Done() //协程内单独捕捉异常 if r := recover (); r != nil { err := errors. New (fmt.Sprintf( "System panic:%v" , r)) errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理 return } }() ........ }() } |
顺序问题
返回的列表元素的顺序,需要跟传参的列表顺序保持一致,这时我们需要定义个带序号的结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | // 需要记录原始顺序的时候,定义个带编号的结构体 type OrderWithSeq struct { Seq int OrderItem Order } //重写相关排序类型 type BySeq []OrderWithSeq func (a BySeq) Len () int { return len (a) } func (a BySeq) Swap(i, j int ) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int ) bool { return a[i].Seq < a[j].Seq } // 调整返回结果 orderCh := make ( chan OrderWithSeq, taskNum) //接收带序号的结构体 //在执行任务时,加入序号 for i:= 0 ; i < taskNum; i++ { i:= i wg.Add( 1 ) go func () { ···· //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //带上i这个序号 OrderItem: res, } }() //接收信息,也按带序号的结构体进行组装 orderSeqList := make ([]OrderWithSeq, taskNum) for i:= 0 ; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append (orderSeqList, order) } ..... } } //按原始顺序进行排序 sort.Sort(BySeq(orderSeqList)) ....重新组装数据返回 |
总结
标准模板如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | type Order struct { Name string `json: "name" ` Id int `json: "id" ` } // 需要记录原始顺序的时候,定义个带编号的结构体 type OrderWithSeq struct { Seq int OrderItem Order } //重写相关排序类型 type BySeq []OrderWithSeq func (a BySeq) Len () int { return len (a) } func (a BySeq) Swap(i, j int ) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int ) bool { return a[i].Seq < a[j].Seq } taskNum := 10 orderCh := make ( chan OrderWithSeq, taskNum) //接收带序号的结构体 errCh := make ( chan error , taskNum) //接收返回的异常 wg := sync.WaitGroup{} //在执行任务时,加入序号 for i:= 0 ; i < taskNum; i++ { i:= i wg.Add( 1 ) go func () { defer func () { wg.Done() //协程内单独捕捉异常 if r := recover (); r != nil { err := errors. New (fmt.Sprintf( "System panic:%v" , r)) errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理 return } }() //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //带上i这个序号 OrderItem: res, } }() wg.Wait() //接收信息,也按带序号的结构体进行组装 orderSeqList := make ([]OrderWithSeq, taskNum) timeoutTime := time.Second * 3 taskTimer := time.NewTimer(timeoutTime) for i:= 0 ; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append (orderSeqList, order) } case err := <-errCh: //接收errCh if err != nil { return err } case <-taskTimer.C: //处理超时 err := errors. New ( "task timeout" ) return default : fmt. Println ( "done" ) } taskTimer.Reset(timeoutTime) } close (orderCh) close (errCh) //按原始顺序进行排序 sort.Sort(BySeq(orderSeqList)) |
到此这篇关于golang批量执行任务的通用模板分享的文章就介绍到这了,更多相关golang批量执行任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!
最新评论