Go实现后台任务调度系统的实例代码
作者:堆栈future
平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验,为了解决这一问题,需要我们自己实现后台任务调度系统,本文将介绍如何用Go语言实现后台任务调度系统,需要的朋友可以参考下
一、背景
平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验。针对这种场景我们一般都是后台异步处理,不需要前端等待所有的都执行完才返回。为了解决这一问题,需要我们自己实现后台任务调度系统。
二、任务调度器实现
poll.go
package poller import ( "context" "fmt" "log" "sync" "time" ) type Poller struct { routineGroup *goroutineGroup // 并发控制 workerNum int // 记录同时在运行的最大goroutine数 sync.Mutex ready chan struct{} // 某个goroutine已经准备好了 metric *metric // 统计当前在运行中的goroutine数量 } func NewPoller(workerNum int) *Poller { return &Poller{ routineGroup: newRoutineGroup(), workerNum: workerNum, ready: make(chan struct{}, 1), metric: newMetric(), } } // 调度器 func (p *Poller) schedule() { p.Lock() defer p.Unlock() if int(p.metric.BusyWorkers()) >= p.workerNum { return } select { case p.ready <- struct{}{}: // 只要满足当前goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine执行任务 default: } } func (p *Poller) Poll(ctx context.Context) error { for { // step01 p.schedule() // 调度 select { case <-p.ready: // goroutine准备好之后 这里就会有消息 case <-ctx.Done(): return nil } LOOP: for { select { case <-ctx.Done(): break LOOP default: // step02 task, err := p.fetch(ctx) // 获取任务 if err != nil { log.Println("fetch task error:", err.Error()) break } fmt.Println(task) p.metric.IncBusyWorker() // 当前正在运行的goroutine+1 // step03 p.routineGroup.Run(func() { // 执行任务 if err := p.execute(ctx, task); err != nil { log.Println("execute task error:", err.Error()) } }) break LOOP } } } } func (p *Poller) fetch(ctx context.Context) (string, error) { time.Sleep(1000 * time.Millisecond) return "task", nil } func (p *Poller) execute(ctx context.Context, task string) error { defer func() { p.metric.DecBusyWorker() // 执行完成之后 goroutine数量-1 p.schedule() // 重新调度下一个goroutine去执行任务 这一步是必须的 }() return nil }
metric.go
package poller import "sync/atomic" type metric struct { busyWorkers uint64 } func newMetric() *metric { return &metric{} } func (m *metric) IncBusyWorker() uint64 { return atomic.AddUint64(&m.busyWorkers, 1) } func (m *metric) DecBusyWorker() uint64 { return atomic.AddUint64(&m.busyWorkers, ^uint64(0)) } func (m *metric) BusyWorkers() uint64 { return atomic.LoadUint64(&m.busyWorkers) }
goroutine_group.go
package poller import "sync" type goroutineGroup struct { waitGroup sync.WaitGroup } func newRoutineGroup() *goroutineGroup { return new(goroutineGroup) } func (g *goroutineGroup) Run(fn func()) { g.waitGroup.Add(1) go func() { defer g.waitGroup.Done() fn() }() } func (g *goroutineGroup) Wait() { g.waitGroup.Wait() }
三、测试
package main import ( "context" "fmt" "ta/poller" "go.uber.org/goleak" "testing" ) func TestMain(m *testing.M) { fmt.Println("start") goleak.VerifyTestMain(m) } func TestPoller(t *testing.T) { producer := poller.NewPoller(5) producer.Poll(context.Background()) }
结果:
四、总结
大家用别的方式也可以实现,核心要点就是控制并发节奏,防止大量请求打到task service
,在这里起到核心作用的就是schedule
,它控制着整个任务系统的调度。同时还封装了WaitGroup
,这在大多数开源代码中都比较常见,大家可以去尝试。另外就是test case
一定得跟上,防止goroutine
泄漏。
以上就是Go实现后台任务调度系统的实例代码的详细内容,更多关于Go后台任务调度系统的资料请关注脚本之家其它相关文章!