我正在评估使用Cadence来执行长时间运行的批量操作。我有以下代码:
class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {
private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()
override fun updateNames(newName: String, entityIds: Collection<String>) {
entityIds.forEach { entityId ->
val childWorkflow = Workflow.newChildWorkflowStub(
UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
)
val promise = Async.function(childWorkflow::setName, newName, entityId)
changeNamePromises.add(promise)
}
val allDone = Promise.allOf(changeNamePromises)
allDone.get()
}
class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
override fun setName(newName: String, entityId: String): SetNameResult {
return Async.function(activities::setName, newName, entityId).get()
}
}
}
这对于数量较少的实体很好,但我很快遇到了以下异常:
java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 2400]
at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...
看起来我很快就耗尽了线程池,Cadence无法安排新任务。
我通过将updateNames
的定义更改为:
override fun updateNames(newName: String, entityIds: Collection<String>) {
entityIds.chunked(200).forEach { sublist ->
val promises = sublist.map { entityId ->
val childWorkflow = Workflow.newChildWorkflowStub(
UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
)
Async.function(childWorkflow::setName, newName, entityId)
}
val allDone = Promise.allOf(promises)
allDone.get()
}
}
这基本上是以200个块为单位处理项目,并等待每个块完成,然后再移动到下一个块。我担心这将执行得如何(在重试时,块中的单个错误将停止处理以下块中的所有记录)。我还担心Cadence在发生崩溃时能够恢复此函数的进度。
我的问题是:有没有一种惯用的Cadence方法来做到这一点,而不会导致这种即时的资源枯竭?我是使用了错误的技术,还是这只是一种天真的方法?
Cadence工作流对单个工作流运行的大小有相对较小的限制。它随着并行工作流运行的次数而扩展。因此,在单个工作流中执行大量任务是一种反模式。
惯用的解决方案是:
对于长时间运行的活动,我们可以使用心跳来通知活动是正在运行还是已死。 我们有一个工作流,该工作流调用多个子工作流,其中包含一些基于父工作流生成的分组的参数。子工作流是长时间运行的工作流。 有没有办法让类似的检测信号从子工作流发送,因为这些工作流长时间运行,超时设置为几个小时?或者更确切地说,通知子工作流正在运行的方法是什么? 我们正在使用go-client来实现工作流程。
我有一个由多台机器组成的网络,我正在使用cadence go客户端。 1号机需要登记活动 机器2需要注册工作流程。 机器3需要启动以启动工作流。 cadence前端服务在另一台机器上。 如何使用 go 客户端执行此操作?此外,收银机是否仅将工作流/活动保存在内存中?我怎样才能把它们推到节奏服务,以便其他机器也能找到它们。
在正常情况下,一个工作流是否会由多个工作流工作人员同时执行?因为多个工作流工作者可以投票决定任务来执行,如果没有,他该怎么做?
将是什么 线程不足,无法执行工作流。如果此消息始终显示,请选择WorkerOptions。应减小maxConcurrentWorklfowExecutionSize或WorkerOptions。maxWorkflowThreads增加。 处于阻塞状态的工作流在内存中保持活动状态??处于等待状态的工作流是否持续检查条件??更多的 -
在Cadence/Temoral工作流编程中: < li >不允许使用本机线程库。例如,在Java中,线程必须通过< code>Async.procedure或< code>Async.function创建,而在Golang中,线程必须通过< code>workflow创建。去吧。那为什么呢? < li >有没有类似使用本机线程的竞争条件?例如,为了线程安全,应该使用< code>Hashtabl
为用户可视化节奏工作流的最佳方式是什么? 我想在一个高层次的视图中向用户展示工作流的不同步骤(类似于大多数食品配送应用程序的功能:下单- 我对向用户展示实际执行的节奏活动不感兴趣,因为我不希望他们看到我的工作流程的详细信息,我只想可视化他们感兴趣的某种高级阶段。 一种方法是保留工作流的高级描述,并在工作流代码本身内部进行状态转换(在启动活动 X 时将阶段 Y 标记为已启动等)。但是,我试图将这个问