Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > golang批量执行任务

golang批量执行任务的通用模板分享

作者:DianWang

这篇文章主要为大家详细介绍了golang实现批量执行任务的通用模板,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以了解一下

需求

一个接口调用时,接收到一个列表,十个元素,需要并发执行十个任务,每个任务都要返回执行的结果和异常,然后对返回的结果装填到一个切片列表里,统一返回结果。

需要协程处理的结构体

type Order struct {  
  Name string `json:"name"`  
  Id int `json:"id"`  
}

确定通道数量

一般按入参的需要处理的元素数量为准

taskNum := 10 

初始化通道

orderCh := make(chan Order, taskNum) //接收返回的结果
errCh := make(chan error, taskNum) //接收返回的异常

发起执行

我们使用sync.WaitGroup来监听执行情况

wg := sync.WaitGroup{}
for i:=0; i < taskNum; i++ {
   wg.Add(1)
   go func() {
     defer wg.Done()
     if i == 3 {//模拟当i=3的时候,返回一个异常
         err := errors.New("there is an error")
         errCh <- err 
         return
     }
     //组装返回结果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <- res    
  }()
}
wg.Wait() //等待所有任务执行完毕

使用for-select接收执行结果

orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
    select {
        case order, ok := <-orderCh: //接收orderCh
        if ok {
            orderList = append(orderList, order)
        }
        case err := <-errCh: //接收errCh
        if err != nil {
            return err //看需求,这里设计发现一个错误就直接停止执行,返回错误
        }
        default:
        fmt.Println("done")
    }
}
//处理完数据,关闭通道
close(orderCh)
close(errCh)

超时问题

任务执行过程中,需要控制每个任务的执行时间,不能超过一定范围,我们用定时器来解决这个问题

timeoutTime := time.Second * 3  //超时时间
taskTimer := time.NewTimer(timeoutTime) //初始化定时器
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
    select {
        ....
        case <-taskTimeout.C: //处理超时
            err := errors.New("task timeout") //此处我们认为超时是错误的一种,赋值给了err
            return
        ...
    }
    //每次执行都需要重置定时器
    taskTimer.Reset(timeoutTime)
}

协程panic问题

主程序是无法捕捉协程内的panic,因此如果不手动处理,就会发生协程内panic导致整个程序中止的情况,我们在defer里处理

for i:=0; i < taskNum; i++ {
   wg.Add(1)
   go func() {
     defer func () {
      wg.Done()
      //协程内单独捕捉异常  
      if r := recover(); r != nil {  
        err := errors.New(fmt.Sprintf("System panic:%v", r))  
        errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理
        return
      }
     }()
   ........
  }()
}

顺序问题

返回的列表元素的顺序,需要跟传参的列表顺序保持一致,这时我们需要定义个带序号的结构体

// 需要记录原始顺序的时候,定义个带编号的结构体  
type OrderWithSeq struct {  
    Seq int  
    OrderItem Order  
}  
//重写相关排序类型
type BySeq []OrderWithSeq  
  
func (a BySeq) Len() int {  
    return len(a)  
}  
func (a BySeq) Swap(i, j int) {  
    a[i], a[j] = a[j], a[i]  
}  
func (a BySeq) Less(i, j int) bool {  
    return a[i].Seq < a[j].Seq  
}
// 调整返回结果
orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体
//在执行任务时,加入序号
for i:=0; i < taskNum; i++ {
   i:= i
   wg.Add(1)
   go func() {
     ····
     //组装返回结果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <-OrderWithSeq {
         Seq: i, //带上i这个序号
         OrderItem: res,
     }
  }()
 //接收信息,也按带序号的结构体进行组装
 orderSeqList := make([]OrderWithSeq, taskNum)
 for i:=0; i<taskNum; i++ {
    select {
        case order, ok := <-orderCh: //接收orderCh
        if ok {
            orderList = append(orderSeqList, order)
        }
       .....
     }
   }
 //按原始顺序进行排序
sort.Sort(BySeq(orderSeqList))
....重新组装数据返回

总结

标准模板如下:

type Order struct {  
  Name string `json:"name"`  
  Id int `json:"id"`  
}

// 需要记录原始顺序的时候,定义个带编号的结构体  
type OrderWithSeq struct {  
    Seq int  
    OrderItem Order  
}  
//重写相关排序类型
type BySeq []OrderWithSeq  
  
func (a BySeq) Len() int {  
    return len(a)  
}  
func (a BySeq) Swap(i, j int) {  
    a[i], a[j] = a[j], a[i]  
}  
func (a BySeq) Less(i, j int) bool {  
    return a[i].Seq < a[j].Seq  
}

taskNum := 10 
orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体
errCh := make(chan error, taskNum) //接收返回的异常
wg := sync.WaitGroup{}
//在执行任务时,加入序号
for i:=0; i < taskNum; i++ {
   i:= i
   wg.Add(1)
   go func() {
     defer func () {
      wg.Done()
      //协程内单独捕捉异常  
      if r := recover(); r != nil {  
        err := errors.New(fmt.Sprintf("System panic:%v", r))  
        errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理
        return
      }
     }()
     //组装返回结果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <-OrderWithSeq {
         Seq: i, //带上i这个序号
         OrderItem: res,
     }
  }()
 wg.Wait()
  //接收信息,也按带序号的结构体进行组装
 orderSeqList := make([]OrderWithSeq, taskNum)
 timeoutTime := time.Second * 3 
 taskTimer := time.NewTimer(timeoutTime)
 for i:=0; i<taskNum; i++ {
    select {
        case order, ok := <-orderCh: //接收orderCh
        if ok {
            orderList = append(orderSeqList, order)
        }
        case err := <-errCh: //接收errCh
        if err != nil {
            return err
        }
        case <-taskTimer.C: //处理超时
        err := errors.New("task timeout")
        return
        default:
        fmt.Println("done")
     }
     taskTimer.Reset(timeoutTime)
   }
close(orderCh)
close(errCh)
 //按原始顺序进行排序
sort.Sort(BySeq(orderSeqList))

到此这篇关于golang批量执行任务的通用模板分享的文章就介绍到这了,更多相关golang批量执行任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文