原文地址:http://zero-tt.fun/go/cron/
说起 Go 的定时任务,不得不学习一波 robfig/cron 包,github地址
package main
import (
"fmt"
"time"
"github.com/robfig/cron/v3"
)
func main() {
job := cron.New(
cron.WithSeconds(), // 添加秒级别支持,默认支持最小粒度为分钟
)
// 每秒钟执行一次
job.AddFunc("* * * * * *", func() {
fmt.Printf("secondly: %v\n", time.Now())
})
job.Run() // 启动
}
cron 表达式格式可以自行百度,这里不再赘述。
需要强调的是,cron 默认支持到分钟级别,如果需要支持到秒级别,在初始化 cron 时,记得 cron.WithSeconds()
参数。
// 每分钟执行一次
job.AddFunc("0 * * * * *", func() {
fmt.Printf("minutely: %v\n", time.Now())
})
// 每小时执行一次
job.AddFunc("0 0 * * * *", func() {
fmt.Printf("hourly: %v\n", time.Now())
})
// 另一种写法
job.AddFunc("@hourly", func() {
fmt.Printf("hourly: %v\n", time.Now())
})
cron 提供的解析器,可以识别 @hourly
这种写法,类似的还有 daily
,weekly
,monthly
,yearly
,annually
。
cron 表达式无法直接实现,另辟蹊径。
// 固定时间间隔执行
job.AddFunc("@every 60s", func() {
fmt.Printf("every: %v\n", time.Now())
})
@every
也是解析器提供的功能,60s
这个写法,其实就是一个时间区间,类似的还有 1h
,1h30m
等,具体的格式可以通过 time.ParseDuration 获取。
job.Schedule(cron.ConstantDelaySchedule{Delay: time.Minute}, cron.FuncJob(func() {
fmt.Printf("every: %v\n", time.Now())
}))
这种写法是自己创建 job 的时候提供一个调度器,并设置每次执行的时间间隔,具体原理下文再分析。
注意:虽然 @every
和 Schedule
也能够实现每小时执行一次的这种任务,但是它和 @hourly
这种方式还是不同的,区别在于:@hourly
是在每个小时的开始的时候执行任务,换句话说,如果你在 11:55 分的时候启动了定时任务,那最近一次的执行时间是 12:00。但是 @every
和 Schedule
这种写法,下次的执行时间会是 12:55,也就是一小时后。
// 描述一个 job 如何循环执行
type Schedule interface {
// Next returns the next activation time, later than the given time.
// Next is invoked initially, and then each time the job is run.
Next(time.Time) time.Time
}
接口类型,定义了一个方法 Next(time.Time) time.Time
,用于返回任务下次的执行时间。
这也是 NewCron()
时的默认选择,提供了对 Cron 表达式的解析能力。具体实现在 spec.go 文件中,只需要了解它的 func (s *SpecSchedule) Next(t time.Time) time.Time
方法返回了 job 下次被调度的时间即可。
ConstantDelaySchedule 也是一样的,我们只需了解 func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time
方法返回任务下次被调度的时间即可,具体的实现在 constantdelay.go 文件中。
type Job interface {
Run()
}
接口类型,定义定时任务,cron 调度一个 Job,就去执行 Job 的 Run() 方法。
type FuncJob func()
func (f FuncJob) Run() { f() }
FuncJob 实际就是一个 func()
类型,实现了 Run()
方法。
修饰器可以有多种,先定义一下修饰器的类型,关于修饰器的说明,可以看我另一篇文章《Go 修饰器》
type JobWrapper func(Job) Job
func SkipIfStillRunning(logger Logger) JobWrapper {
var ch = make(chan struct{}, 1)
ch <- struct{}{}
return func(j Job) Job {
return FuncJob(func() { // 这个外层 func(),封装了真实的用户期望执行的 func()
select {
case v := <-ch:
j.Run() // 这里才是在执行我们真实的 Job
ch <- v
default:
logger.Info("skip")
}
})
}
}
简单理解为该装饰器给 Job 加了一个锁,就是那个大小为 1 的 chan,获取到锁这个 Job 才能执行,获取不到直接 logger.Info()
使用示例:
job.AddJob("@every 1s", cron.SkipIfStillRunning(cron.DefaultLogger)(cron.FuncJob(func() {
time.Sleep(time.Second * 3)
fmt.Printf("SkipIfStillRunning: %v", time.Now())
})))
当然,你也可以在创建 cron 时就使用 chain,这将会对所有 Job 起作用
jobs := cron.New(
cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)),
)
func DelayIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
start := time.Now()
mu.Lock()
defer mu.Unlock()
// 阻塞超过 1 分钟,log 记录
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
})
}
}
和 SkipIfStillRunning 的实现思路是一样的,少了 default 分之,导致了阻塞,而不是直接 log。
由于修饰器可能存在多个,多个修饰器用在一个 Job 上,像套娃一样,一层又一层。
// 所有修饰器的载体
type Chain struct {
wrappers []JobWrapper
}
// 创建修饰器载体
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
// 修饰器应用到 Job,一层一层的套
// 假如是:NewChain(m1, m2, m3).Then(job)
// 相当于:m1(m2(m3(job)))
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}
type Entry struct {
ID EntryID // job id,可以通过该 id 来删除 job
Schedule Schedule // 用于计算 job 下次的执行时间
Next time.Time // job 下次执行时间
Prev time.Time // job 上次执行时间,没执行过为 0
WrappedJob Job // 修饰器加工过的 job
Job Job // 未经修饰的 job,可以理解成就是 AddFunc 的第二个参数
}
结构体字段,上文已经解释清楚了
type Cron struct {
entries []*Entry // Job 集合
chain Chain // 装饰器链
stop chan struct{} // 停止信号
add chan *Entry // add 信号
remove chan EntryID // remove 信号
snapshot chan chan []Entry // 快照
running bool // 是否正在运行
logger Logger // 日志
runningMu sync.Mutex // 运行时锁
location *time.Location // 时区相关
parser Parser // Cron 解析器
nextID EntryID //
jobWaiter sync.WaitGroup // 正在运行的 Job
}
func (c *Cron) run() {
c.logger.Info("start")
// 计算每个 Job 下次的执行时间
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
// 一个死循环,进行任务调度
for {
// 根据下一次的执行时间,对所有 Job 排序
sort.Sort(byTime(c.entries))
// 计时器,用于没有任务可调度时的阻塞操作
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// 无任务可调度,设置计时器到一个很大的值,把下面的 for 阻塞住
timer = time.NewTimer(100000 * time.Hour)
} else {
// 有任务可调度了,计时器根据第一个可调度任务的下次执行时间设置
// 排过序,所以第一个肯定是最先被执行的
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
// 有 Job 到了执行时间
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// 检查所有 Job,执行到时的任务
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
// 执行 Job 的 func()
c.startJob(e.WrappedJob)
e.Prev = e.Next
// 设置 Job 下次的执行时间
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
// 添加新 Job
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
// 获取所有 Job 的快照
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
// 停止调度
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
// 根据 entryId 删除一个 Job
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
}
cron 包主要包含了哪些组件: