当前位置: 首页 > 知识库问答 >
问题:

go 有没有能持久化的分布式定时任务库?

林鹏鹍
2023-09-04

写了个基于kratos的服务,基本需求是:可以增删管理定时任务(如配置每天、每周发个统计报告),支持分布式结构,持久化任务。加分项包括:注册回调、结果和日志记录、失败重试等。

看kratos已经支持的transport有两个:asynq和machinery,但似乎都不满足需要基本的持久化任务需求,machinery甚至还不能删除已经添加的任务。

// RegisterPeriodicTask register a periodic task which will be triggered periodicallyfunc (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error {    //check spec    schedule, err := cron.ParseStandard(spec)    if err != nil {        return err    }    f := func() {        //get lock        err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)        if err != nil {            return        }        //send task        _, err = server.SendTask(tasks.CopySignature(signature))        if err != nil {            log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())        }    }    //scheduler基于github.com/robfig/cron/v3    _, err = server.scheduler.AddFunc(spec, f)    return err}

考虑到服务可能会重启,那么添加过的任务如何恢复呢?如果没有这样的库,考虑基于github.com/go-co-op/gocron 自己封装一个,配置的任务保存在redis中,服务启动的时候全量初始化,之后订阅对应的主题,再通过分布式锁来保证执行的唯一性,这样是否可行?

共有1个答案

卜飞鸣
2023-09-04
package mainimport (    "fmt"    "github.com/go-co-op/gocron"    "github.com/go-redis/redis/v8"    "golang.org/x/net/context"    "sync")var (    redisClient *redis.Client    scheduler   *gocron.Scheduler    ctx         = context.Background())func init() {    redisClient = redis.NewClient(&redis.Options{        Addr: "localhost:6379",    })    scheduler = gocron.NewScheduler(time.UTC)}func main() {    var wg sync.WaitGroup    wg.Add(1)    loadTasksFromRedis()    scheduler.StartAsync()    wg.Wait()}func loadTasksFromRedis() {    // 从 Redis 获取所有任务    taskKeys, err := redisClient.Keys(ctx, "task:*").Result()    if err != nil {        fmt.Println("Error fetching tasks from Redis:", err)        return    }    for _, taskKey := range taskKeys {        taskData, err := redisClient.HGetAll(ctx, taskKey).Result()        if err != nil {            fmt.Println("Error fetching task data from Redis:", err)            continue        }        createTaskFromData(taskData)    }}func createTaskFromData(taskData map[string]string) {    // 解析 taskData 并创建任务    // ...    // 添加任务到调度器    // ...}func executeTask(taskData map[string]string) {    // 尝试获取分布式锁    lock, err := redisClient.SetNX(ctx, "lock:"+taskData["id"], 1, time.Duration(taskData["lockDuration"])*time.Second).Result()    if err != nil || !lock {        fmt.Println("Failed to acquire lock for task:", taskData["id"])        return    }    // 执行任务    // ...    // 释放分布式锁    redisClient.Del(ctx, "lock:"+taskData["id"])}
 类似资料:
  • redis提供了两种持久化的方式,分别是RDB(Redis DataBase)和AOF(Append Only File)。 RDB,简而言之,就是在不同的时间点,将redis存储的数据生成快照并存储到磁盘等介质上; AOF,则是换了一个角度来实现持久化,那就是将redis执行过的所有写指令记录下来,在下次redis重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了。 其实R

  • 本文向大家介绍Redis 持久化有几种方式?相关面试题,主要包含被问及Redis 持久化有几种方式?时的应答技巧和注意事项,需要的朋友参考一下 Redis 的持久化有两种方式,或者说有两种策略: RDB(Redis Database):指定的时间间隔能对你的数据进行快照存储。 AOF(Append Only File):每一个收到的写命令都通过write函数追加到文件中。

  • 我有连接到我的数据库运行。我可以执行以下没有问题: 然而,在设置了JPA和持久类之后,我总是得到一个“未选择数据库”错误。看起来我不需要调整我的数据库配置(MySQL连接到Glassfish 3.1),否则上面的代码将无法工作。 正在拨打的电话: 我尝试过这个调用直接在MySQL工作台和它不工作。 这一个确实有效: 我一直在玩游戏,似乎无法在任何地方添加数据库名称(“人”)。以下是我目前掌握的情况

  • 我正在尝试在DigitalOcean的Kubernetes中运行一个Redis集群。作为一个poc,我只是尝试运行了我在网上找到的一个示例(https://github.com/sanderploegsma/redis-cluster/blob/master/redis-cluster.yml),该示例能够在使用Minikube本地运行时适当地旋转POD。 然而,当在Digital Ocean上运

  • 本文向大家介绍iOS中持久化方式有哪些?相关面试题,主要包含被问及iOS中持久化方式有哪些?时的应答技巧和注意事项,需要的朋友参考一下 属性列表文件 — NSUserDefaults 的存储,实际是本地生成一个 plist 文件,将所需属性存储在 plist 文件中 对象归档 — 本地创建文件并写入数据,文件类型不限 SQLite 数据库 — 本地创建数据库文件,进行数据处理 CoreData —