当前位置: 首页 > 知识库问答 >
问题:

执行包含许多子工作流/活动的工作流时,Cadence引发WorkflowRejectedExecutionError

陈朗
2023-03-14

我正在评估使用Cadence来执行长时间运行的批量操作。我有以下代码:

class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {

    private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()

    override fun updateNames(newName: String, entityIds: Collection<String>) {
        entityIds.forEach { entityId ->
            val childWorkflow = Workflow.newChildWorkflowStub(
                    UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
            )
            val promise = Async.function(childWorkflow::setName, newName, entityId)

            changeNamePromises.add(promise)
        }

        val allDone = Promise.allOf(changeNamePromises)
        allDone.get()
    }

    class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
        override fun setName(newName: String, entityId: String): SetNameResult {
            return Async.function(activities::setName, newName, entityId).get()
        }
    }
}

这对于数量较少的实体很好,但我很快遇到了以下异常:

java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 2400]
    at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...

看起来我很快就耗尽了线程池,Cadence无法安排新任务。

我通过将updateNames的定义更改为:

    override fun updateNames(newName: String, entityIds: Collection<String>) {

        entityIds.chunked(200).forEach { sublist ->
            val promises = sublist.map { entityId ->
                val childWorkflow = Workflow.newChildWorkflowStub(
                        UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
                )
                Async.function(childWorkflow::setName, newName, entityId)
            }

            val allDone = Promise.allOf(promises)
            allDone.get()
        }
    }

这基本上是以200个块为单位处理项目,并等待每个块完成,然后再移动到下一个块。我担心这将执行得如何(在重试时,块中的单个错误将停止处理以下块中的所有记录)。我还担心Cadence在发生崩溃时能够恢复此函数的进度。

我的问题是:有没有一种惯用的Cadence方法来做到这一点,而不会导致这种即时的资源枯竭?我是使用了错误的技术,还是这只是一种天真的方法?

共有1个答案

吕英豪
2023-03-14

Cadence工作流对单个工作流运行的大小有相对较小的限制。它随着并行工作流运行的次数而扩展。因此,在单个工作流中执行大量任务是一种反模式。

惯用的解决方案是:

  • 运行有限大小的 chank,然后以新名称调用“继续”。这样,单个运行大小是有界的。
  • 使用分层工作流。具有 1k 个子项的单一父级,每个子项执行 1000 个活动,允许执行 100 万个活动,并限制每个工作流历史记录的大小。
 类似资料:
  • 对于长时间运行的活动,我们可以使用心跳来通知活动是正在运行还是已死。 我们有一个工作流,该工作流调用多个子工作流,其中包含一些基于父工作流生成的分组的参数。子工作流是长时间运行的工作流。 有没有办法让类似的检测信号从子工作流发送,因为这些工作流长时间运行,超时设置为几个小时?或者更确切地说,通知子工作流正在运行的方法是什么? 我们正在使用go-client来实现工作流程。

  • 我有一个由多台机器组成的网络,我正在使用cadence go客户端。 1号机需要登记活动 机器2需要注册工作流程。 机器3需要启动以启动工作流。 cadence前端服务在另一台机器上。 如何使用 go 客户端执行此操作?此外,收银机是否仅将工作流/活动保存在内存中?我怎样才能把它们推到节奏服务,以便其他机器也能找到它们。

  • 在正常情况下,一个工作流是否会由多个工作流工作人员同时执行?因为多个工作流工作者可以投票决定任务来执行,如果没有,他该怎么做?

  • 将是什么 线程不足,无法执行工作流。如果此消息始终显示,请选择WorkerOptions。应减小maxConcurrentWorklfowExecutionSize或WorkerOptions。maxWorkflowThreads增加。 处于阻塞状态的工作流在内存中保持活动状态??处于等待状态的工作流是否持续检查条件??更多的 -

  • 在Cadence/Temoral工作流编程中: < li >不允许使用本机线程库。例如,在Java中,线程必须通过< code>Async.procedure或< code>Async.function创建,而在Golang中,线程必须通过< code>workflow创建。去吧。那为什么呢? < li >有没有类似使用本机线程的竞争条件?例如,为了线程安全,应该使用< code>Hashtabl

  • 为用户可视化节奏工作流的最佳方式是什么? 我想在一个高层次的视图中向用户展示工作流的不同步骤(类似于大多数食品配送应用程序的功能:下单- 我对向用户展示实际执行的节奏活动不感兴趣,因为我不希望他们看到我的工作流程的详细信息,我只想可视化他们感兴趣的某种高级阶段。 一种方法是保留工作流的高级描述,并在工作流代码本身内部进行状态转换(在启动活动 X 时将阶段 Y 标记为已启动等)。但是,我试图将这个问