当前位置: 首页 > 工具软件 > gocron > 使用案例 >

Go框架进阶—定时任务 goCron

顾昊穹
2023-12-01

goCron是一个Golang作业调度工具,可以使用简单的语法定期执行go函数。

github:GitHub - jasonlvhit/gocron: A Golang Job Scheduling Package.

api doc:https://godoc.org/github.com/jasonlvhit/gocron#Every

使用实例

package main

import (
"fmt"
"github.com/jasonlvhit/gocron"
)

func task() {
	fmt.Println("I am runnning task.")
}

func taskWithParams(a int, b string) {
	fmt.Println(a, b)
}

func main() {
	//可并发运行多个任务
	//注意 interval>1时调用sAPi
	gocron.Every(2).Seconds().Do(task)
	gocron.Every(1).Second().Do(taskWithParams, 1, "hi")
	//在cron所有操作最后调用 start函数,否则start之后调用的操作无效不执行
	//<-gocron.Start()

	//在task执行过程中 禁止异常退出
	gocron.Every(1).Minute().DoSafely(taskWithParams, 1, "hello")

	// 支持在具体某一天、某天的某一时刻、每y-M-d h-m-s 执行任务
	gocron.Every(1).Monday().Do(task)
	gocron.Every(1).Thursday().Do(task)
	// function At() take a string like 'hour:min'
	gocron.Every(1).Day().At("10:30").Do(task)
	gocron.Every(1).Monday().At("18:30").Do(task)

	// 删除某一任务
	gocron.Remove(task)

	//删除所有任务
	gocron.Clear()

	//可同时创建一个新的任务调度 2个schedulers 同时执行
	s := gocron.NewScheduler()
	s.Every(3).Seconds().Do(task)
	<-s.Start()

	//防止多个集群中任务同时执行 task 实现lock接口
	//两行代码,对cron 设置lock实现,执行task时调用Lock方法再Do task
	gocron.SetLocker(lockerImplementation)
	gocron.Every(1).Hour().Lock().Do(task)

	<-gocron.Start()
}

源码浅析

 轻量 简洁的链式调用,看工程源码,简洁的……只有一个类。gocron当前支持最多1w个任务数,核心对象就是维护了job对象,所有执行的任务都放在jobs数组中,start方法底层调用go time包下的NewTicker,新启一个线程执行task方法。

外部调用的gocron.start func调用链

// Start all the pending jobs
// Add seconds ticker
func (s *Scheduler) Start() chan bool {
	stopped := make(chan bool, 1)
	ticker := time.NewTicker(1 * time.Second)

	go func() {
		for {
			select {
			case <-ticker.C:
				s.RunPending()    //调用RunPending 执行数组有序的任务队列
			case <-stopped:
				ticker.Stop()
				return
			}
		}
	}()

	return stopped
}

// RunPending runs all the jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
	runnableJobs, n := s.getRunnableJobs()

	if n != 0 {
		for i := 0; i < n; i++ {
			runnableJobs[i].run()
		}
	}
}

//run方法,反射获取job的各属性,最终调用function.call方法执行任务函数
//Run the job and immediately reschedule it
func (j *Job) run() (result []reflect.Value, err error) {
	if j.lock {
		if locker == nil {
			err = fmt.Errorf("trying to lock %s with nil locker", j.jobFunc)
			return
		}
		key := getFunctionKey(j.jobFunc)

		if ok, err := locker.Lock(key); err != nil || !ok {
			return nil, err
		}

		defer func() {
			if e := locker.Unlock(key); e != nil {
				err = e
			}
		}()
	}

	f := reflect.ValueOf(j.funcs[j.jobFunc])
	params := j.fparams[j.jobFunc]
	if len(params) != f.Type().NumIn() {
		err = errors.New("the number of param is not adapted")
		return
	}
	in := make([]reflect.Value, len(params))
	for k, param := range params {
		in[k] = reflect.ValueOf(param)
	}
	result = f.Call(in)
	j.lastRun = time.Now()
	j.scheduleNextRun()
	return
}

这里需要关注一下,gocron对lock的实现,从代码上看Job结构体的lock属性,用于控制多实例job并发执行。但项目woner提到的 multiple instances 指的并不是跨服务器的多实例,而是在同一应用服务 里的多任务实例(也就是1个app服务中多个任务,粒度是只在统一应用内)。如果跨server则lock需要自行依赖redis或其他分布式锁来管理。通过读源码的run方法,j.lock来控制job并发,但一旦跨server job.lock属性是没法共享的。这里doc上给的解释有点歧义,需要注意。

If you need to prevent a job from running at the same time from multiple cron instances (like running a cron app from multiple servers), you can provide a Locker implementation and lock the required jobs. 然后owner给出了一个基于redis来做的lock 

Job结构体

// Job struct keeping information about job
type Job struct {
	interval uint64                   // pause interval * unit bettween runs
	jobFunc  string                   // the job jobFunc to run, func[jobFunc]
	unit     string                   // time units, ,e.g. 'minutes', 'hours'...
	atTime   time.Duration            // optional time at which this job runs
	lastRun  time.Time                // datetime of last run
	nextRun  time.Time                // datetime of next run
	startDay time.Weekday             // Specific day of the week to start on
	funcs    map[string]interface{}   // Map for the function task store
	fparams  map[string][]interface{} // Map for function and  params of function
	lock     bool                     // lock the job from running at same time form multiple instances
}

scheduler内维护JOBs array数组和大小,gocron scheduler最大可执行1w个job(大小可重写)。

// Scheduler struct, the only data member is the list of jobs.
// - implements the sort.Interface{} for sorting jobs, by the time nextRun
type Scheduler struct {
	jobs [MAXJOBNUM]*Job // Array store jobs ,const MAXJOBNUM = 10000
	size int             // Size of jobs which jobs holding.
}

对于执行时间的控制,均通过对job的unit属性进行设置,代码如下

// Seconds set the unit with seconds
func (j *Job) Seconds() *Job {
	return j.setUnit("seconds")
}

// Minutes set the unit with minute
func (j *Job) Minutes() *Job {
	return j.setUnit("minutes")
}

// Second set the unit with second
func (j *Job) Second() *Job {
	j.mustInterval(1)
	return j.Seconds()
}

// Minute set the unit  with minute, which interval is 1
func (j *Job) Minute() *Job {
	j.mustInterval(1)
	return j.Minutes()
}

// Sunday sets the job start day Sunday
func (j *Job) Sunday() *Job {
	return j.Weekday(time.Sunday)
}

// Every schedule a new periodic job with interval
func (s *Scheduler) Every(interval uint64) *Job {
	job := NewJob(interval)
	s.jobs[s.size] = job
	s.size++
	return job
}

简单总结

gocron代码总共570行,在java中但凡涉及到一点“通用工具”或“架构”的实现,除了多依赖之外,撸代码是少不了的。但在go中要实现满足基本功能的cron任务调度,没有多余依赖,纯基于gosdk本身,575行代码打完收工。这是在不接触go语言之前设想不到的事情。轻量好用,为中间件而生。

编程语言跟人类沟通语言一样,属性都是工具,透过这个工具无论作为人或是工程师,给我们打开的是另一个世界和景象。在对比中扬长避短,可对比的资源越多,越是能找到最优方案。不怕不知道,就怕不知道。

 类似资料: