当前位置: 首页 > 知识库问答 >
问题:

ReactorJava中反应性任务的顺序执行

常智勇
2023-03-14

我正在将阻塞顺序编排框架转换为反应式。现在,这些任务是动态的,并通过JSON输入输入到引擎中。引擎提取类并执行run()方法,并使用每个任务的响应保存状态
如何在Reactor中实现相同的链接?如果这是一个静态DAG,我会将它与flatMapthen操作符链接起来,但由于它是动态的,我如何继续执行反应性任务并收集每个任务的输出?

示例:
非反应性接口:

public interface OrchestrationTask {
 OrchestrationContext run(IngestionContext ctx);
}

核心发动机

public Status executeDAG(String id) {
  IngestionContext ctx = ContextBuilder.getCtx(id);
  List<OrchestrationTask> tasks = app.getEligibleTasks(id);

  for(OrchestrationTask task : tasks) {
    // Eligible tasks are executed sequentially and results are collected.
    OrchestrationContext stepContext = task.run(ctx);
    if(!evaluateResult(stepContext)) break;
  }
  return Status.SUCCESS;
}

按照上面的示例,如果我将任务转换为返回Mono

更新::

反应性任务示例。

public class SampleTask implements OrchestrationTask {  
  @Override
  public Mono<OrchestrationContext> run(OrchestrationContext context) {  
  // Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
  return Mono.just(context).delayElements(Duration.ofSeconds(2));
 }

因此,我将有一系列任务来完成各种事情,但是每个任务的响应依赖于以前的任务,并存储在编排上下文中。每当发生错误时,业务流程上下文标志将设置为false,通量应停止。

共有1个答案

解修然
2023-03-14

当然,我们可以:

  • 从任务列表中创建流量(如果以反应方式生成任务列表是合适的,那么可以直接用流量替换该arraylist,如果不是,则保持原样)

所以把所有这些放在一起,替换你的循环

Flux.fromIterable(tasks)
        .flatMap(task -> task.run(ctx))
        .takeWhile(stepContext -> evaluateResult(stepContext))
        .then(Mono.just(Status.SUCCESS));

(因为我们已经使它成为被动的,所以您的方法显然需要返回一个Mono

根据注释进行更新-如果您只是希望“一次执行一个”,而不是同时执行多个,则可以使用concatMap()而不是flatMap()

 类似资料:
  • 问题内容: 我已经搜索了很多,但是找不到任何解决方案。我以这种方式使用Java线程池: 以这种方式,任务以随后的顺序执行(如在队列中)。但是我需要更改“选择下一个任务”策略。所以我想为每个任务分配优先级(不是线程优先级),并执行与这些优先级相对应的任务。因此,当执行者完成另一个任务时,它将选择下一个任务作为具有最高优先级的任务。它描述了常见问题。也许有一种更简单的方法没有说明优先级。它选择上一个添

  • 问题内容: 我有三个连接的线程,即第二个线程在第一个死后执行。 这是我的代码: 我将如何使用而不是三个线程对象来实现此功能? 问题答案: 如果您想要/需要的是一个接一个地执行一组作业,但要在与主应用程序线程不同的单个线程中执行,请使用。

  • \android\app\build\中介\清单\完整\调试\AndroidManifest.xml:61: AAPT:错误:资源mipmap/ic_notif(又名...: mipmap/ic_notif)未找到。 错误:处理清单失败。 失败:构建失败,出现异常。 问题所在:任务“:app:processDebugResources”的执行失败。无法处理资源,有关详细信息,请参阅上面的 aapt

  • 我在调试中构建我的项目没有问题。我刚刚安装了一个新的npm包(bugsnag),然后做了react-本机链接,现在我不能再构建它了。我尝试了谷歌的许多建议解决方案,但都不起作用。 奇怪的部分,我试图建立我的项目的备份文件,它没有建立也有同样的错误,所以感觉像它与其他东西有关? 错误详细信息: .... 导致原因:Java . util . concurrent . execution except

  • 失败:生成失败,出现异常。 > 错误:任务“:app:CompiledEbugJavaWithJavac”执行失败。 编译失败;有关详细信息,请参阅编译器错误输出。

  • null 这工作得很好,但对于下面的命令,我得到一个错误 错误: > 错误:任务“:app:InstallRelease”执行失败。 com.android.builder.testing.api.deviceException:com.android.ddmlib.installexception:install_failed_update_incompatible:包io.nativebase