安装方式
export GOPROXY=https://goproxy.cn/,https://mirrors.aliyun.com/goproxy/,direct
go get github.com/robfig/cron/v3
代码使用
package main
import (
"fmt"
"github.com/robfig/cron/v3"
"log"
"os"
"time"
)
func main() {
location, _ := time.LoadLocation("Asia/Shanghai")
c := cron.New(
//打印出 定时任务内部的日志到控制台
cron.WithLogger(cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))),
//按秒级别 解析 字符串
cron.WithSeconds(),
//设置定时任务时区
cron.WithLocation(location),
)
//添加定时任务,每隔一秒执行一次
entryId, err := c.AddFunc("*/1 * * * * *", func() {
fmt.Println("每隔一秒执行一次")
})
if err != nil {
fmt.Println(err)
}
fmt.Println(entryId)
//协程启动定时任务
go c.Start()
defer c.Stop()
select {
case <-time.After(time.Second * 3):
fmt.Println("结束定时任务")
}
}
源码实现功能原理
通过 Timer和channel为主要实现技术思路;
1.先解析任务表达式,算出距离当前时间 最近的下次执行的时刻,
2.然后 取出所有定时任务中最近的下次执行的时刻距离当前时间的 间隔时间 放到 Timer中
3.等timer到达时间后,通过channel通知相关任务执行,并再次设置下次任务执行时刻;
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
c.logger.Info("start")
// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
for {
// Determine the next entry to run.
// 将定时任务执行时间进行排序,最近最早执行的放在前面
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
// 生成一个定时器,距离最近的任务时间到时 触发定时器的channel,发送通知
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
// 定时时间到了,执行定时任务,并设置下次执行的时刻
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
//对每个定时任务尝试执行
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
//新增的定时任务添加到 任务列表中
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
//获取 当前所有定时任务(快照)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
//停止定时任务,timer停止即可完成此功能
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
//删除某个定时任务
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
}
注意:
1.具体定时任务的执行,是放在新的协程里执行的,且有recover方法,保证某个定时任务失败,不会影响到其他定时任务或其他协程;
2.此cron可以直接嵌入到 gin框架中使用