当前位置: 首页 > 知识库问答 >
问题:

如何使用RXJava顺序链接Vertx CompositeFuture?

汪高岑
2023-03-14

我需要以RxJava风格按顺序链接Vertx CompositeFuture,以获得相关CompositeFuture,避免回调地狱。

用例:

每个CompositeFuture.any/all做一些返回期货的异步操作,比如myList1、myList2、myList3,但是我必须等待CompositeFuture.any(myList1)完成并返回成功,然后再做CompositeFuture.any(myList2),从myList2到myList3也是如此。自然,CompositeFuture本身异步执行作业,但只是针对它的一组操作,因为下一组必须在第一组运行良好之后完成。

以“回调地狱风格”进行操作将是:

    public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {


        CompositeFuture.any(myList1 < Future >)
                .onComplete(ar1 -> {
                    if (!ar1.succeeded()) {
                        asyncResultHandler.handle(ar1);
                    } else {
                        CompositeFuture.any(myList2 < Future >)
                                .onComplete(ar2 -> {
                                            if (!ar2.succeeded()) {
                                                asyncResultHandler.handle(ar2);
                                            } else {
                                                CompositeFuture.all(myList3 < Future >)
                                                        .onComplete(ar3 -> {
                                                            asyncResultHandler.handle(ar3);
                                                        
    .... <ARROW OF CLOSING BRACKETS> ...
}

现在我尝试了这样的事情:

    public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
        Single
                .just(CompositeFuture.any(myList1 < Future >))
                .flatMap(previousFuture -> rxComposeAny(previousFuture, myList2 < Future >))
                .flatMap(previousFuture -> rxComposeAll(previousFuture, myList3 < Future >))
                .subscribe(SingleHelper.toObserver(asyncResultHandler));
    }

    public static Single<CompositeFuture> rxComposeAny(CompositeFuture previousResult, List<Future> myList) {
        if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow

        CompositeFuture compositeFuture = CompositeFuture.any(myList);
        return Single.just(compositeFuture);
    }

    public static Single<CompositeFuture> rxComposeAll(CompositeFuture previousResult, List<Future> myList) {
        if (previousResult.failed()) return Single.just(previousResult);

        CompositeFuture compositeFuture = CompositeFuture.any(myList);
        return Single.just(compositeFuture);
    }
}

更加紧凑和清晰。但是,我没有成功地将前面的失败传递给asyncResultHandler。

我的想法如下:flatMap通过前面的CompositeFuture结果,我想检查它是否失败。下一个rxComposeAny/All首先检查前一个是否失败,如果失败,则只返回失败的CompositeFuture,依此类推,直到它到达订阅服务器中的处理程序。如果前一个通过了测试,我可以继续传递当前结果,直到最后一个成功的CompositeFuture命中处理程序

问题是支票

        if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow

不起作用,所有的CompositeFuture都已处理,但没有测试是否成功完成,只有最后一个最终被传递给asyncResultHandler,后者将测试整体失败(但在我的代码中,它只检查最后一个)

我正在使用Vertx 3.9.0和RxJava 2 Vertx API。

披露:我在Vertx方面有经验,但我对RxJava完全陌生。所以我很欣赏任何答案,从技术解决方案到概念解释。

非常感谢。

编辑(在@homerman的出色响应之后):我需要具有与顺序相关CompositeFuture的“回调地狱风格”完全相同的行为,即,必须在onComplete和test for completed with failure或success之后调用next。复杂性来自以下事实:

  1. 我必须使用vertx CompositeAll/任何方法,而不是zip。Zip提供类似于CompositeAll的行为,但不是CompositeAny。
  2. CompositeAll/any返回on完成方法中完成的未来。如果我像上面显示的那样检查它,因为它是异步的,我将得到未解析的未来。
  3. CompositeAll/任何如果失败不会抛出错误,而是在on完成中失败的未来,所以我不能使用rxJava中的onError。

例如,我在rxComposite函数中尝试了以下更改:

    public static Single<CompositeFuture> rxLoadVerticlesAny(CompositeFuture previousResult, Vertx vertx, String deploymentName,
                                                             List<Class<? extends Verticle>> verticles, JsonObject config) {
        previousResult.onComplete(event -> {
                    if (event.failed()) {
                        return Single.just(previousResult);

                    } else {
                        CompositeFuture compositeFuture = CompositeFuture.any(VertxDeployHelper.deploy(vertx, verticles, config));
                        return Single.just(compositeFuture);
                    }
                }
        );
    }

但自然它不会编译,因为lambda无效。我如何在Vertx中重现与rxJava完全相同的行为?

共有2个答案

仇正平
2023-03-14

在对Vertx源代码进行了一些研究之后,我发现了一种公共方法,CompositeFuture的rx版本使用该方法将“传统”CompositeFuture转换为其rx版本。方法是io。vertx。反应性X。果心合成未来。newInstance。有了这个变通方法,我可以使用我的传统方法,然后将其转换为在rx链中使用。这就是我想要的,因为改变现有的传统方法是有问题的。

以下是带有注释的代码:

rxGetConfig(vertx)
        .flatMap(config -> {
            return rxComposeAny(vertx, config)
                    .flatMap(r -> rxComposeAny(vertx, config))
                    .flatMap(r -> rxComposeAll(vertx, config));
        })
        .subscribe(
                compositeFuture -> {
                    compositeFuture.onSuccess(event -> startPromise.complete());
                },
                error -> startPromise.fail(error));


public static Single<JsonObject> rxGetConfig(Vertx vertx) {
    ConfigRetrieverOptions enrichConfigRetrieverOptions = getEnrichConfigRetrieverOptions();
    // the reason we create new vertx is just to get an instance that is rx
    // so this ConfigRetriever is from io.vertx.reactivex.config, instead of normal io.vertx.config
    ConfigRetriever configRetriever = ConfigRetriever.create(io.vertx.reactivex.core.Vertx.newInstance(vertx), enrichConfigRetrieverOptions);

    return configRetriever.rxGetConfig();
}

public static Single<io.vertx.reactivex.core.CompositeFuture> rxComposeAny(Vertx vertx, JsonObject config) {

    // instead of adapted all the parameters of myMethodsThatReturnsFutures to be rx compliant, 
    // we create it 'normally' and the converts bellow to rx CompositeFuture
    CompositeFuture compositeFuture = CompositeFuture.any(myMethodsThatReturnsFutures(config));

    return io.vertx.reactivex.core.CompositeFuture
            .newInstance(compositeFuture)
            .rxOnComplete();
}
公羊学义
2023-03-14

只是想澄清一下。。。

每个合成未来。任何/全部都会执行一些返回未来的异步操作,比如myList1、myList2、myList3,但我必须等待CompositeFuture。any(myList1)完成并返回success,然后再执行CompositeFuture。任何(myList2),从myList2到myList3都是相同的。

您提供了CompositeFuture.any()CompositeFuture.all()作为参考点,但是您描述的行为与all()一致,也就是说,只有当所有成分都成功时,生成的复合才会成功。

就我的回答而言,我假设all()是您期望的行为。

在RxJava中,异常触发的意外错误将导致流终止,底层异常将通过回调传递给观察者。

作为一个小型演示,假设以下设置:

final Single<String> a1 = Single.just("Batch-A-Operation-1");
final Single<String> a2 = Single.just("Batch-A-Operation-2");
final Single<String> a3 = Single.just("Batch-A-Operation-3");

final Single<String> b1 = Single.just("Batch-B-Operation-1");
final Single<String> b2 = Single.just("Batch-B-Operation-2");
final Single<String> b3 = Single.just("Batch-B-Operation-3");

final Single<String> c1 = Single.just("Batch-C-Operation-1");
final Single<String> c2 = Single.just("Batch-C-Operation-2");
final Single<String> c3 = Single.just("Batch-C-Operation-3");

每个单个表示要执行的一个离散操作,它们根据一些逻辑分组进行逻辑命名(即它们是要一起执行的)。例如,“Batch-A”对应“myList1”,Batch-B对应“myList2”。。。

假设以下流:

Single
    .zip(a1, a2, a3, (s, s2, s3) -> {
      return "A's completed successfully";
    })
    .flatMap((Function<String, SingleSource<String>>) s -> {
      throw new RuntimeException("B's failed");
    })
    .flatMap((Function<String, SingleSource<String>>) s -> {
      return Single.zip(c1, c2, c3, (one, two, three) -> "C's completed successfully");
    })
    .subscribe(
        s -> System.out.println("## onSuccess(" + s + ")"),
        t -> System.out.println("## onError(" + t.getMessage() + ")")
    );

(如果您不熟悉,可以使用zip()操作符组合作为输入提供的所有源的结果,以发出另一个/新源)。

在该流中,由于B的处理最终引发异常:

  • 流在执行B的过程中终止
  • 将异常报告给观察者(即触发onError()处理程序)
  • C永远不会被处理

但是,如果您想要的是自己决定是否执行每个分支,您可以采取的一种方法是使用某种状态持有者将以前操作的结果向下传递,如下所示:

class State {
  final String value;
  final Throwable error;

  State(String value, Throwable error) {
    this.value = value;
    this.error = error;
  }
}

然后可以修改流以有条件地执行不同的批,例如:

Single
    .zip(a1, a2, a3, (s, s2, s3) -> {
      try {
        // Execute the A's here...
        return new State("A's completed successfully", null);

      } catch(Throwable t) {
        return new State(null, t);
      }
    })
    .flatMap((Function<State, SingleSource<State>>) s -> {
      if(s.error != null) {
        // If an error occurred upstream, skip this batch...
        return Single.just(s);

      } else {
        try {
          // ...otherwise, execute the B's
          return Single.just(new State("B's completed successfully", null));
          
        } catch(Throwable t) {
          return Single.just(new State(null, t));
        }
      }
    })
    .flatMap((Function<State, SingleSource<State>>) s -> {
      if(s.error != null) {
        // If an error occurred upstream, skip this batch...
        return Single.just(s);

      } else {
        try {
          // ...otherwise, execute the C's
          return Single.just(new State("C's completed successfully", null));

        } catch(Throwable t) {
          return Single.just(new State(null, t));
        }
      }
    })
    .subscribe(
        s -> {
          if(s.error != null) {
            System.out.println("## onSuccess with error: " + s.error.getMessage());
          } else {
            System.out.println("## onSuccess without error: " + s.value);
          }
        },
        t -> System.out.println("## onError(" + t.getMessage() + ")")
    );
 类似资料:
  • 问题内容: 在promise库 Q中 ,您可以执行以下操作以顺序链接promise: 但是,以下命令不适用于 $ q : 问题答案: 只需使用$ q.when()函数: 注意:foo必须是工厂,例如

  • 所以基本上我想做的是,打第一个网络电话。如果调用的RESTful web服务返回1,则进行第二次网络调用。如果web服务返回0,则不要进行第二次网络调用。 这是我的密码 显然,上面的代码是错误的,因为它应该总是返回可观察的。那么,如果第一次网络调用返回0,我的代码应该如何编写?

  • 本文向大家介绍区块链以什么顺序链接?相关面试题,主要包含被问及区块链以什么顺序链接?时的应答技巧和注意事项,需要的朋友参考一下 回答:区块链中的所有区块都以反向顺序链接,或者每个区块都与其前一个区块链接。

  • 前面学习了如何使用三元组 顺序表存储稀疏矩阵,其实现过程就是将矩阵中各个非 0 元素的行标、列标和元素值以三元组的形式存储到一维数组中。通过研究实现代码你会发现,三元组顺序表每次提取指定元素都需要遍历整个数组,运行效率很低。 本节将学习另一种存储矩阵的方法—— 行逻辑链接的顺序表。 它可以看作是三元组顺序表的升级版,即在三元组顺序表的基础上改善了提取数据的效率。 行逻辑链接的顺序表和三元组顺序表的

  • 问题内容: 如何使用链接调用JavaScript代码? 问题答案: 要么 编辑: 上面的回答确实不是一个好的解决方案,自从我最初发布以来,已经学到了很多有关JS的知识.

  • 问题内容: 如果我有这样的数据: 我如何将命令连接成这样: 我在下面使用了此查询,但命令列的顺序不依其顺序号而定: 任何意见和建议将不胜感激。^ _ ^ 问题答案: 永远不要使用。阅读为什么不在Oracle中使用WM_CONCAT函数? 请参阅本主题https://stackoverflow.com/a/28758117/3989608。 它没有记录,并且依赖的任何应用程序一旦升级到后都将无法工作