当前位置: 首页 > 知识库问答 >
问题:

在Cadence/Temporal工作流程中处理信号的最佳方式/模式是什么?

轩辕远
2023-03-14

当使用像文档建议的那样的信号时:

public class MyWorkflow{
   public Output myWorkflwMethod(Input input){
      ...
   }

   public void mySignalMethod(request){
     // do actual processing here. 
     ...
   }
}

我可能会遇到以下问题:

  1. 我想保证一次处理一个FIFO
  2. 我想处理signalWithStart的“赛车状态”,其中信号方法调用得太早
  3. 我想安全地重置工作流。重置后,可以在历史早期重新应用信号
  4. 我想确保工作流不会在信号处理之前提前完成

共有1个答案

卫劲
2023-03-14
  1. 保证FIFO按顺序一次处理一个
  2. 在信号方法过早调用的情况下处理signalWithStart的“竞争条件”。或者在现实中,对于没有signalWithStart的常规信号,信号可能在工作流准备好处理之前来得太早。
  3. 安全地重置工作流。重置后,信号可以在历史记录的早期重新应用
  4. 确保在处理信号之前不会提前完成工作流程

这四个是在Cadence/Temporal工作流中使用signal时最常见的错误。

您可以应用一种设计模式来一起解决所有问题。

其思想是简化信号处理程序,总是将信号放入一个队列中,而工作流方法将启动另一个工作流线程来处理该队列。

它基于样本(节奏

public class MyWorkflow{
   private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 

   public void mySignalMethod(SignalRequest req){
       signalRequestQueue.add(req);
   }

   public Output myWorkflwMethod(Input input){
      //1. do everything necessary/needed before actually processing a signal
      ...

      //2. spin up a workflow thread to process 
      Async.procedure(
      () -> {
          while (true) {
              Workflow.await(() -> !signalRequestQueue.isEmpty());
              final SignalRequest request = signalRequestQueue.poll();
              processSignal(request);
          }
      });


      //3. always wait for queue to be empty before completing/failing/continueAsNew the workflow
      Workflow.await(() -> signalRequestQueue.isEmpty());
      return output
   }

   private void processSignal(request){
     // do your actual processing here. 
     // If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
     ...
   }
}

您应该使用版本控制来进行迁移。

假设您有这样的现有代码;

public class MyWorkflow{
   public Output myWorkflwMethod(Input input){
      ...
   }

   public void mySignalMethod(request){
     // do your actual processing here. 
     ...
   }
}

然后应使用如下版本控制:

public class MyWorkflow{
   private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 

   public void mySignalMethod(SignalRequest req){
       int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
       if( version == 1){
          signalRequestQueue.add(req);
       }else{
          processSignal(req);
       }
   }

   public Output myWorkflwMethod(Input input){
      //1. do everything necessary/needed before actually processing a signal
      ...

       int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
       if( version == 1){
         //2. spin up a workflow thread to process 
         Async.procedure(
         () -> {
             while (true) {
                 Workflow.await(() -> !signalRequestQueue.isEmpty());
                 final SignalRequest request = signalRequestQueue.poll();
                 processSignal(request);
             }
         });
       }

      //3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow
      Workflow.await(() -> signalRequestQueue.isEmpty());
      return output
   }

   private void processSignal(request){
     // do your actual processing here. 
     // If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
     ...
   }
}

Golang SDK没有1/2/3的相同问题。这是因为Golang SDK提供了一个完全不同的API来处理信号。

Golang SDK不是将信号方法定义为处理程序,而是需要工作流侦听通道来处理信号,这正是这个答案在Java中建议做的事情。请参阅如何发出 API 信号的示例。(参见节奏/时间)

但它有问题#4——工作流可能会在处理信号之前提前完成。这是Golang SDK的一个常见错误。

建议在完成或继续作为新工作流之前,始终排空信号通道。请参阅此示例,了解如何在Golang中排空信号通道。

它类似于使用工作流。在Java中等待,等待处理所有信号。但是因为通道没有api来获取大小,所以我们必须使用“默认”分支来检查空值。

感谢@Maxim指出颞态 html" target="_blank">go sdk 中的 API - 或者,在颞态 go-sdk 中使用“已挂起”API 来检查是否使用了所有信号。

此外,建议监控“unhandledSignal”指标。

 类似资料:
  • 在Cadence/Temoral工作流编程中: < li >不允许使用本机线程库。例如,在Java中,线程必须通过< code>Async.procedure或< code>Async.function创建,而在Golang中,线程必须通过< code>workflow创建。去吧。那为什么呢? < li >有没有类似使用本机线程的竞争条件?例如,为了线程安全,应该使用< code>Hashtabl

  • 在我的工作流和活动中,我想记录一些消息以进行调试。 我看到了< code >的节奏。GetLogger(ctx)。Info()函数,但是不知道在哪里可以找到日志。

  • 我计划将 Cadence 或临时工作流用于架构,但我们计划在决定工作流时为用户提供很大的权力。在他们的用例中,节奏和时间都提到他们的SDK支持自定义DSL,但我看不到该功能。你能帮帮我吗?

  • 我们有一个使用基于cron的调度的工作流程。我们需要支持一个用例来更改cron表达式。 这样做的最佳做法是什么?

  • 有一个 K8S 群集,我们的大多数部署只是更新映像的版本,但有时我们也希望更新部署配置的某些部分。我们的部署配置不包括映像的标记。 对于更新映像版本似乎是我最好的选择。至于一起更新部署配置和映像,我看到了几种方法: < li>kubectl部署...:< code>kubectl集合图像...[但是有两个部署] < li >使用实际图像标记编辑部署YAML[看起来不太优雅] < li>kubect

  • 除了之外,还有其他方法可以发出工作流拒绝信号吗? 基本上,我有一个工作流,它在超时前会定期继续运行。但是,如果它在继续像新的一样工作之前不断收到大量信号,它将会超时并丢失一些信号。我可以将< code > MaximumSignalsPerExecution 设置得更低,这样它就会在超时之前拒绝信号,但理想情况下,我希望能够在工作流级别进行配置。 我正在测试一些更糟糕的情况,其中存在流量峰值,并且