当使用像文档建议的那样的信号时:
public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do actual processing here.
...
}
}
我可能会遇到以下问题:
这四个是在Cadence/Temporal工作流中使用signal时最常见的错误。
您可以应用一种设计模式来一起解决所有问题。
其思想是简化信号处理程序,总是将信号放入一个队列中,而工作流方法将启动另一个工作流线程来处理该队列。
它基于样本(节奏
public class MyWorkflow{
private Queue<SignalRequest> signalRequestQueue = new LinkedList<>();
public void mySignalMethod(SignalRequest req){
signalRequestQueue.add(req);
}
public Output myWorkflwMethod(Input input){
//1. do everything necessary/needed before actually processing a signal
...
//2. spin up a workflow thread to process
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !signalRequestQueue.isEmpty());
final SignalRequest request = signalRequestQueue.poll();
processSignal(request);
}
});
//3. always wait for queue to be empty before completing/failing/continueAsNew the workflow
Workflow.await(() -> signalRequestQueue.isEmpty());
return output
}
private void processSignal(request){
// do your actual processing here.
// If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
...
}
}
您应该使用版本控制来进行迁移。
假设您有这样的现有代码;
public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do your actual processing here.
...
}
}
然后应使用如下版本控制:
public class MyWorkflow{
private Queue<SignalRequest> signalRequestQueue = new LinkedList<>();
public void mySignalMethod(SignalRequest req){
int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
if( version == 1){
signalRequestQueue.add(req);
}else{
processSignal(req);
}
}
public Output myWorkflwMethod(Input input){
//1. do everything necessary/needed before actually processing a signal
...
int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
if( version == 1){
//2. spin up a workflow thread to process
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !signalRequestQueue.isEmpty());
final SignalRequest request = signalRequestQueue.poll();
processSignal(request);
}
});
}
//3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow
Workflow.await(() -> signalRequestQueue.isEmpty());
return output
}
private void processSignal(request){
// do your actual processing here.
// If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
...
}
}
Golang SDK没有1/2/3的相同问题。这是因为Golang SDK提供了一个完全不同的API来处理信号。
Golang SDK不是将信号方法定义为处理程序,而是需要工作流侦听通道来处理信号,这正是这个答案在Java中建议做的事情。请参阅如何发出 API 信号的示例。(参见节奏/时间)
但它有问题#4——工作流可能会在处理信号之前提前完成。这是Golang SDK的一个常见错误。
建议在完成或继续作为新工作流之前,始终排空信号通道。请参阅此示例,了解如何在Golang中排空信号通道。
它类似于使用工作流。在Java中等待,等待处理所有信号。但是因为通道没有api来获取大小,所以我们必须使用“默认”分支来检查空值。
感谢@Maxim指出颞态 html" target="_blank">go sdk 中的 API - 或者,在颞态 go-sdk 中使用“已挂起”API 来检查是否使用了所有信号。
此外,建议监控“unhandledSignal”指标。
在Cadence/Temoral工作流编程中: < li >不允许使用本机线程库。例如,在Java中,线程必须通过< code>Async.procedure或< code>Async.function创建,而在Golang中,线程必须通过< code>workflow创建。去吧。那为什么呢? < li >有没有类似使用本机线程的竞争条件?例如,为了线程安全,应该使用< code>Hashtabl
在我的工作流和活动中,我想记录一些消息以进行调试。 我看到了< code >的节奏。GetLogger(ctx)。Info()函数,但是不知道在哪里可以找到日志。
我计划将 Cadence 或临时工作流用于架构,但我们计划在决定工作流时为用户提供很大的权力。在他们的用例中,节奏和时间都提到他们的SDK支持自定义DSL,但我看不到该功能。你能帮帮我吗?
我们有一个使用基于cron的调度的工作流程。我们需要支持一个用例来更改cron表达式。 这样做的最佳做法是什么?
除了之外,还有其他方法可以发出工作流拒绝信号吗? 基本上,我有一个工作流,它在超时前会定期继续运行。但是,如果它在继续像新的一样工作之前不断收到大量信号,它将会超时并丢失一些信号。我可以将< code > MaximumSignalsPerExecution 设置得更低,这样它就会在超时之前拒绝信号,但理想情况下,我希望能够在工作流级别进行配置。 我正在测试一些更糟糕的情况,其中存在流量峰值,并且
有一个 K8S 群集,我们的大多数部署只是更新映像的版本,但有时我们也希望更新部署配置的某些部分。我们的部署配置不包括映像的标记。 对于更新映像版本似乎是我最好的选择。至于一起更新部署配置和映像,我看到了几种方法: < li>kubectl部署...:< code>kubectl集合图像...[但是有两个部署] < li >使用实际图像标记编辑部署YAML[看起来不太优雅] < li>kubect