Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang定时任务框架GoCron

Golang定时任务框架GoCron的源码分析

作者:AlfredChaos

本文主要介绍了Golang定时任务框架GoCron的源码分析,原生的gocron存在一些问题,如任务列表维护不当、并发操作不可预测等,经过改进的gocron解决了这些问题,感兴趣的可以了解一下

背景说明

最近工作上有个开发定时任务的需求,调研一下后发现Golang并没有十分完善的定时任务库。
整理我这边的需求如下:

显然,现成的cron库无法满足我的需求。限定于工期,最终自己实现了一个粗糙的事件驱动定时器。

自己实现的事件驱动定时器

但这个事件驱动定时器具有以下的缺点:

综上,我需要着重考察现有的Golang任务调度框架,对任务定时器进行重新设计。

GoCron任务调度库

https://github.com/jasonlvhit/gocron

调用实例

package main

import (
	"fmt"
	"time"

	"github.com/jasonlvhit/gocron"
)

func task() {
	fmt.Println("I am running task.")
}

func taskWithParams(a int, b string) {
	fmt.Println(a, b)
}

func main() {
	// Do jobs without params
	gocron.Every(1).Second().Do(task)
	gocron.Every(2).Seconds().Do(task)
	gocron.Every(1).Minute().Do(task)
	gocron.Every(2).Minutes().Do(task)
	gocron.Every(1).Hour().Do(task)
	gocron.Every(2).Hours().Do(task)
	gocron.Every(1).Day().Do(task)
	gocron.Every(2).Days().Do(task)
	gocron.Every(1).Week().Do(task)
	gocron.Every(2).Weeks().Do(task)

	// Do jobs with params
	gocron.Every(1).Second().Do(taskWithParams, 1, "hello")

	// Do jobs on specific weekday
	gocron.Every(1).Monday().Do(task)
	gocron.Every(1).Thursday().Do(task)

	// Do a job at a specific time - 'hour:min:sec' - seconds optional
	gocron.Every(1).Day().At("10:30").Do(task)
	gocron.Every(1).Monday().At("18:30").Do(task)
	gocron.Every(1).Tuesday().At("18:30:59").Do(task)

	// Begin job immediately upon start
	gocron.Every(1).Hour().From(gocron.NextTick()).Do(task)

	// Begin job at a specific date/time
	t := time.Date(2019, time.November, 10, 15, 0, 0, 0, time.Local)
	gocron.Every(1).Hour().From(&t).Do(task)

	// NextRun gets the next running time
	_, time := gocron.NextRun()
	fmt.Println(time)

	// Remove a specific job
	gocron.Remove(task)

	// Clear all scheduled jobs
	gocron.Clear()

	// Start all the pending jobs
	<- gocron.Start()

	// also, you can create a new scheduler
	// to run two schedulers concurrently
	s := gocron.NewScheduler()
	s.Every(3).Seconds().Do(task)
	<- s.Start()
}

项目分析

这个工具库仅有三个文件:

gocron工具库

代码主要分为job和scheduler两个文件,gocron仅放置了回调方法和公共方法。项目整体架构如下:

gocron项目架构

gocron通过scheduler维护一个job列表,指定MAXJOBNUM最大工作队列,限制可执行的工作数大小。

// gocron/scheduler.go
// Scheduler struct, the only data member is the list of jobs.
// - implements the sort.Interface{} for sorting jobs, by the time nextRun
type Scheduler struct {
	jobs [MAXJOBNUM]*Job // Array store jobs
	size int             // Size of jobs which jobs holding.
	loc  *time.Location  // Location to use when scheduling jobs with specified times
}

这里需要更正一下,并不是全局列表,仅仅只是跟随调度器的生命周期。实际上,代码确实存在全局的默认调度器:

var (
	defaultScheduler = NewScheduler()
)

因此,可以直接调用。当然也支持实例化自己的调度器:

s := gocron.NewScheduler()
s.Every(3).Seconds().Do(task)
<- s.Start()

gocron是典型的链式调用,scheduler对象通过返回job对象,完成job对象的封装操作之后,加入调度器内部的jobs列表,再通过Start方法启动调度器监控协程,轮询列表中的jobs,一旦找到可执行的任务,就会启动协程运行job的Func对象。

// Job struct keeping information about job
type Job struct {
	interval uint64                   // pause interval * unit between runs
	jobFunc  string                   // the job jobFunc to run, func[jobFunc]
	//......
	funcs    map[string]interface{}   // Map for the function task store
	fparams  map[string][]interface{} // Map for function and  params of function
	//......
}

funcs维护一个map,缓存funcName到func的映射关系。具体封装在Do方法:

// gocron/job.go
// func (j *Job) Do(jobFun interface{}, params ...interface{}) error
fname := getFunctionName(jobFun)
j.funcs[fname] = jobFun
j.fparams[fname] = params
j.jobFunc = fname

在执行任务时,通过反射回调func:

// gocron/job.go
// func (j *Job) run() ([]reflect.Value, error)
result, err := callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
if err != nil {
	return nil, err
}

// gocron/gocron.go
func callJobFuncWithParams(jobFunc interface{}, params []interface{}) ([]reflect.Value, error) {
	f := reflect.ValueOf(jobFunc)
	if len(params) != f.Type().NumIn() {
		return nil, ErrParamsNotAdapted
	}
	in := make([]reflect.Value, len(params))
	for k, param := range params {
		in[k] = reflect.ValueOf(param)
	}
	return f.Call(in), nil
}

启动调度器时,启动监控协程:

// Start all the pending jobs
// Add seconds ticker
func (s *Scheduler) Start() chan bool {
	stopped := make(chan bool, 1)
	// ticker每秒产生一个信号
	ticker := time.NewTicker(1 * time.Second)

	go func() {
		for {
			// select选择器阻塞
			// case接收到信号则执行
			// 同时接收到多个信号则随机选择一个执行
			select {
			// ticker每秒产生一次信号
			// RunPending轮询jobs列表,寻找到了时间可执行的任务
			case <-ticker.C:
				s.RunPending()
			// stopped接收到停止信号,退出调度器协程
			case <-stopped:
				ticker.Stop()
				return
			}
		}
	}()

	return stopped
}

一个调度器一个协程,通过统一的调度协程去监控调度器任务列表内的任务。

// RunPending runs all the jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
	// 轮询jobs列表,找到到时间可执行的任务,创建可执行任务列表
	runnableJobs, n := s.getRunnableJobs()

	if n != 0 {
		for i := 0; i < n; i++ {
			// 启动协程运行
			go runnableJobs[i].run()
			// 刷新job执行信息,等待下一轮调度
			runnableJobs[i].lastRun = time.Now()
			runnableJobs[i].scheduleNextRun()
		}
	}
}

综合分析

综上,gocron有如下好处:

但它的缺陷也同样明显:

新的GoCron分析

https://github.com/go-co-op/gocron
原gocron的作者居然住进ICU了,管理员说截止至2020年3月依然无法联系上他。愿他身体安康……gocron被fork后有了新的发展,赶紧扒下来学习一下

新的gocron新增了很多内容,依然围绕着Scheduler和Job进行链式操作,但新增了executor模块。executor仅负责执行Scheduler调度过来的任务。

项目架构

下面是项目README文档里公开的架构图:

啊!大佬们画图能不能认真一点啊虽然已经非常生动形象了

新功能

新版gocron支持了cron格式的语法

// cron expressions supported
s.Cron("*/1 * * * *").Do(task) // every minute

新增了异步和阻塞模式的两种调度方式

// you can start running the scheduler in two different ways:
// starts the scheduler asynchronously
s.StartAsync()
// starts the scheduler and blocks current execution path
s.StartBlocking()

通过设置信号量限制可同时运行的任务数量

// gocron/scheduler.go
// SetMaxConcurrentJobs limits how many jobs can be running at the same time.
// This is useful when running resource intensive jobs and a precise start time is not critical.
func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) {
	// 通过对n的配置修改并发任务数的大小
	s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n))
	// limitMode即当可执行任务达到最大并发量时,应该如何处理的逻辑
	// RescheduleMode:跳过本次执行,等待下一次调度
	// WaitMode:持续等待,知道可执行队列空出。但,由于等待的任务数积累,可能导致不可预知的后果,某些任务可能一直等不到执行
	s.executor.limitMode = mode
}

// gocron/executor.go
// 通过信号量的方式从最大数量中取一位
// 若通过,下一步可以执行函数
if e.maxRunningJobs != nil {
	if !e.maxRunningJobs.TryAcquire(1) {

		switch e.limitMode {
		case RescheduleMode:
			return
		case WaitMode:
			select {
			case <-stopCtx.Done():
				return
			case <-f.ctx.Done():
				return
			default:
			}

			if err := e.maxRunningJobs.Acquire(f.ctx, 1); err != nil {
				break

			}
		}
	}

	defer e.maxRunningJobs.Release(1)
}

gocron支持指定Job以单例模式运行。通过siglefilght工具库保证当前仅有一个可运行的Job

// gocron/job.go
// SingletonMode prevents a new job from starting if the prior job has not yet
// completed it's run
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run overrun itself as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// SingletonMode() func on the scheduler chain when scheduling the job.
func (j *Job) SingletonMode() {
	j.mu.Lock()
	defer j.mu.Unlock()
	j.runConfig.mode = singletonMode
	j.jobFunction.limiter = &singleflight.Group{}
}

// gocron/executor.go
switch f.runConfig.mode {
case defaultMode:
	runJob()
case singletonMode:
	// limiter是singlefilght对象,Do方法内仅会执行一次,保证一次只运行一个任务
	_, _, _ = f.limiter.Do("main", func() (interface{}, error) {
		select {
		case <-stopCtx.Done():
			return nil, nil
		case <-f.ctx.Done():
			return nil, nil
		default:
		}
		runJob()
		return nil, nil
	})
}

gocron主要数据结构

主要分为schduler调度器,job任务,以及executor执行器对象

主要数据结构

追踪一下调用链的工作流程:

func NewScheduler(loc *time.Location) *Scheduler {
	//	这时已经将executor同步初始化完毕
	// scheduler和executor是一对一的关系
	executor := newExecutor()

	return &Scheduler{
		jobs:       make([]*Job, 0),
		location:   loc,
		running:    false,
		time:       &trueTime{},
		executor:   &executor,
		tagsUnique: false,
		timer:      afterFunc,
	}
}
if s.updateJob || s.jobCreated {
	job = s.getCurrentJob()
}

接下来确定Job的运行周期,并加入到任务列表

s.setJobs(append(s.Jobs(), job))

Every方法返回了新增Job的scheduler,此时scheduler的任务队列中存在一个Job就绪,等待下一步调度。

if job.error != nil {
	// delete the job from the scheduler as this job
	// cannot be executed
	s.RemoveByReference(job)
	return nil, job.error
}
// 还有很多判断条件,这里不一一列举

将Do方法将要执行的函数封装进Job。接下来判断schduler是否启动:如之前gocron一样,scheduler也是通过协程监听并执行启动任务协程的工作。

之前的scheduler,默认启动一个ticker,每秒去排序并轮询任务队列,从中取出满足条件的任务开始执行,效率非常低。而现在的改进是:scheduler启动监听协程后;不是以轮询而是以通知的方式,从channel中获取Job的Function,再启动协程去执行。

在这样的前提下,scheduler监听协程什么时候启动是位置的。此处添加一个判断,当scheduler启动时,同时启动runContinuous去完成Job的最后一步操作。若是scheduler没有启动,那么直接返回,等待scheduler启动后再完成操作。

// we should not schedule if not running since we can't foresee how long it will take for the scheduler to start
if s.IsRunning() {
	s.runContinuous(job)
}

通过这样的设计,在最终启动scheduler前后,都可以以动态的方式添加/移除任务。

// StartAsync starts all jobs without blocking the current thread
func (s *Scheduler) StartAsync() {
	if !s.IsRunning() {
		s.start()
	}
}

// StartBlocking starts all jobs and blocks the current thread.
// This blocking method can be stopped with Stop() from a separate goroutine.
func (s *Scheduler) StartBlocking() {
	s.StartAsync()
	s.startBlockingStopChanMutex.Lock()
	s.startBlockingStopChan = make(chan struct{}, 1)
	s.startBlockingStopChanMutex.Unlock()
	<-s.startBlockingStopChan
}

一般情况下,我们通过异步模式,启动对所有任务的监控

// start starts the scheduler, scheduling and running jobs
func (s *Scheduler) start() {
	// 启动监听协程,select选择器配合channel阻塞
	// 直到Job准备执行发送通知
	go s.executor.start()
	// 将scheduler置位为running
	s.setRunning(true)
	// 遍历所有任务,以递归的方式监控起来
	s.runJobs(s.Jobs())
}

比较有意思的是这个部分:

func (s *Scheduler) runJobs(jobs []*Job) {
	for _, job := range jobs {
		// 这个函数是一个递归调用
		// 这里对所有Job都以递归的方式监听着
		s.runContinuous(job)
	}
}

// 这是runContinuous的部分代码
job.setTimer(s.timer(nextRun, func() {
	if !next.dateTime.IsZero() {
		for {
			n := s.now().UnixNano() - next.dateTime.UnixNano()
			// 某个任务满足执行条件了,退出循环
			if n >= 0 {
				break
			}
			s.time.Sleep(time.Duration(n))
		}
	}
	// 递归执行本方法
	// runContinuous会判断当前Job是否可执行
	// 若不则退出,若可以则将Job设置为立即执行,并刷新执行时间
	// 若Job“立即执行”的标志已经置位,直接调用run发送通知给监听协程
	s.runContinuous(job)
}))

这样的设计太优雅了,大佬们的奇思妙想啊~

runJob := func() {
	f.incrementRunState()
	callJobFunc(f.eventListeners.onBeforeJobExecution)
	callJobFuncWithParams(f.function, f.parameters)
	callJobFunc(f.eventListeners.onAfterJobExecution)
	f.decrementRunState()
}

eventListeners封装了两个接口,用以在执行任务和完成任务后发送给用户事件通知。

综合分析

gocron进行了不少方面的优化:

最后

最后的最后,gocron依然无法满足我当前的需求,但已经不妨碍我对源码进行下一步的改造:

到此这篇关于Golang定时任务框架GoCron的源码分析的文章就介绍到这了,更多相关Golang定时任务框架GoCron内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

您可能感兴趣的文章:
阅读全文