当前位置: 首页 > 知识库问答 >
问题:

在cadence工作流中的循环内调用相同的活动

伯茂才
2023-03-14

我在cadence workflow中有一个问题,我们可以在for循环中用不同的输入调用相同的活动吗?代码是确定性的吗?如果执行工作流的工作人员在执行过程中被停止并在稍后重新启动,cadence是否能够在重新构建工作流时重放事件。

例如,我有以下代码。

   func init() {
    workflow.RegisterWithOptions(SampleWorkFlow, workflow.RegisterOptions{Name: "SampleWorkFlow"})
    activity.RegisterWithOptions(SampleActivity, activity.RegisterOptions{Name: "SampleActivity"})
    activity.RegisterWithOptions(SecondActivity, activity.RegisterOptions{Name: "SecondActivity"})
}

// SampleWorkFlow comment
func SampleWorkFlow(ctx workflow.Context, input string) error {

    fmt.Println("Workflow started")
    ctx = workflow.WithTaskList(ctx, sampleTaskList)
    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)

    var result string
    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
    if err != nil {
        return err
    }

    for i := 1; i <= 10; i++ {
        value := i
        workflow.Go(ctx, func(ctx workflow.Context) {
            err := workflow.ExecuteActivity(ctx, "SecondActivity", input, value).Get(ctx, &result)
            if err != nil {
                log.Println("err=", err)
            }
        })
    }

    return nil

}

// SampleActivity comment
func SampleActivity(ctx context.Context, value, v1 string) (string, error) {
    fmt.Println("Sample activity start")
    for i := 0; i <= 10; i++ {
        fmt.Println(i)
    }
    return "Hello " + value, nil
}

// SecondActivity comment
func SecondActivity(ctx context.Context, value int) (string, error) {

    fmt.Println("Second  activity start")

    fmt.Println("value=", value)
    fmt.Println("Second activity going to end")
    return "Hello " + fmt.Sprintf("%d", value), nil
}

这里,第二个活动是在for循环中并行调用的。我的第一个问题是,这个代码是确定性的吗?

假设在循环5次迭代后,当i=5时,执行此工作流的工作程序终止,如果工作流在另一个工作程序中启动,抑扬顿挫是否能够重放事件?

你能回答我的问题吗?

共有1个答案

弘涛
2023-03-14

是的,这个代码是确定性的。它不调用任何非确定性操作(如随机或UUID生成),并使用< code >工作流。Go启动goroutine。所以是确定性的。代码的复杂性在定义其确定性时不起作用。

不相关的尼特。无需在示例中使用 goroutine,因为执行活动调用已经通过返回 Future 来非阻塞。因此,该示例可以简化为:

func SampleWorkFlow(ctx workflow.Context, input string) error {

    fmt.Println("Workflow started")
    ctx = workflow.WithTaskList(ctx, sampleTaskList)
    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)

    var result string
    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
    if err != nil {
        return err
    }

    for i := 1; i <= 10; i++ {
       workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
    }
    return nil
}

请注意,此示例仍然可能不会以您预期的方式执行,因为它会在没有等待活动完成的情况下完成工作流。所以这些活动根本不会开始。

下面是等待活动完成的代码:

java prettyprint-override">func SampleWorkFlow(ctx workflow.Context, input string) error {

    fmt.Println("Workflow started")
    ctx = workflow.WithTaskList(ctx, sampleTaskList)
    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)

    var result string
    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
    if err != nil {
        return err
    }
    var results []workflow.Future
    for i := 1; i <= 10; i++ {
        future := workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
        results = append(results, future)
    }
    for i := 0; i < 10; i++ {
        var result string
        err := results[i].Get(ctx, &result)
        if err != nil {
            log.Println("err=", err)
        }
    }
    return nil
}
 类似资料:
  • 假设我有用不同语言编写的工人(Java

  • 这个方法的问题是它总是返回零。看起来没有使用它的参数。然而,如果我按照下面的方式来写,那么一切都运行得很好。有没有人知道为什么第一种方法不起作用?

  • 我有一个由多台机器组成的网络,我正在使用cadence go客户端。 1号机需要登记活动 机器2需要注册工作流程。 机器3需要启动以启动工作流。 cadence前端服务在另一台机器上。 如何使用 go 客户端执行此操作?此外,收银机是否仅将工作流/活动保存在内存中?我怎样才能把它们推到节奏服务,以便其他机器也能找到它们。

  • 我正在评估使用Cadence来执行长时间运行的批量操作。我有以下代码: 这对于数量较少的实体很好,但我很快遇到了以下异常: 看起来我很快就耗尽了线程池,Cadence无法安排新任务。 我通过将的定义更改为: 这基本上是以200个块为单位处理项目,并等待每个块完成,然后再移动到下一个块。我担心这将执行得如何(在重试时,块中的单个错误将停止处理以下块中的所有记录)。我还担心Cadence在发生崩溃时能

  • 将是什么 线程不足,无法执行工作流。如果此消息始终显示,请选择WorkerOptions。应减小maxConcurrentWorklfowExecutionSize或WorkerOptions。maxWorkflowThreads增加。 处于阻塞状态的工作流在内存中保持活动状态??处于等待状态的工作流是否持续检查条件??更多的 -

  • 基本上,当工作流退出时(尤其是在我们不想在所有错误返回中重复调用ActivityCleanupError的错误情况下),我们都要执行catchall活动ActivityCleanupNoError和ActivityCLanupError。 这是否适用于分布式决策?例如,如果工作流决策的所有权从一个工作人员转移到另一个工作人员,是否会触发对原始工作人员的延迟? 额外的问题:日志记录器在每次工作流运行