当前位置: 首页 > 知识库问答 >
问题:

Vertx与Futures中任意数量调用的顺序组合

乔丁雨
2023-03-14

我们在vertx中使用Futures,例如:

Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

        fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
                asyncResult -> {
                    if (asyncResult.succeeded()) {
                    LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
                    handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
                }
                else {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
        });

例如,我们处理2个呼叫。

或者我有另一个代码段,可以处理任意数量的方法:

List<Future> futures = new ArrayList<>();
        conversation.getRequestList().forEach(req -> {
            Future<Message<Object>> senderFuture = Future.future();
            vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());

            // sent successfully. save the replyAddress and the conversation for later/callback
            log.info("Saving the conversation for the request.", conversation.getReplyAddress());
            pendingCommands.put(req.getBody().getString(MSG_ID), conversation);

            futures.add(senderFuture);
        });

        CompositeFuture.all(futures).setHandler(ar -> {
            if (ar.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                log.error("forwardToVWClient VW got result : {}", ar.cause());
                handler.handle(Future.failedFuture(ar.cause()));
            }
        });

这里,我们将链接会话中的所有请求。getRequestList(),而不事先知道它们的计数。

但是<代码>的缺点。all()方法是,我们无法控制订单。

如何使用Vertx Futures链接任意数量的方法(不知道调用的确切数量)?

编辑:

官方指南谈到了顺序组合,但给出的示例有3个调用。它没有解释如何对任意数量的调用执行此操作。

参见http://vertx.io/docs/vertx-core/java/中的“顺序合成”

我希望这是清楚的。

共有3个答案

侯令雪
2023-03-14

这里有一些方便的东西。希望有帮助。

public static <R> Future<List<R>> allOfFutures(List<Future<R>> futures) {
    return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()]))
            .map(v -> futures.stream()
                    .map(Future::result)
                    .collect(Collectors.toList())
            );
}
长孙星汉
2023-03-14

如果您想将上一个请求的响应提供给下一个请求,并且假设您对每个响应有不同的处理程序。您可以添加一个帮助方法

private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
    Future<T> result = init;
    for (Function<T, Future<T>> handler : handlers) {
        result = result.compose(handler);
    }
    return result;
}

然后像这样改变你的代码

    Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

    Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
        vehicleDoor(routingContext, client, vehicleJson, lock);

    Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
        // add here new request by using information from someJsonObj
        LOG.info("Hello from trivial handler {} ", someJsonObj);
        return Future.succeededFuture(someJsonObj);
    };

    List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();

    handlers.add(vehicleResponseHandler);
    handlers.add(anotherTrivialHandler);

    chain(fetchVehicle, handlers).setHandler( asyncResult -> {
        if (asyncResult.succeeded()) {
            handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
        } else {
            handler.handle(Future.failedFuture(asyncResult.cause()));
        }
    });

但是这种实现有一个限制,它要求每个链接的Future必须具有相同的类型参数T

闽涵蓄
2023-03-14

下面是使用映射的解决方案

 public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
        return list.stream().reduce(Future.succeededFuture(),// the initial "future"
                (acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
                (a,b) -> Future.future()); // not used! only useful for parallel stream.
    }

可按以下示例使用:

 chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);

其中,sendApiRequestViaBus是:

/**
     * @param request The request to process
     * @return The result of the request processing. 
     */
    Future<String> sendApiRequestViaBus(ApiRequest request) {
        Future<String> future = Future.future();
        String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
        log.debug("Chain call start msgId {}", request.getId());

        vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
            log.debug("Chain call returns {}", request.getId());
            if (res.succeeded()) {
                future.complete("OK");
            } else {
                future.fail("KO");
            }
        });
        return future;
    }

我希望它能有所帮助。

 类似资料:
  • 问题内容: 我有一个元组列表: 该列表可以是任意长度,元组也可以。我想按它们出现的顺序将其转换成元素的列表或元组: 如果在开发时知道我会得到多少个元组,我可以添加它们: 但是由于直到运行时我才知道会有多少个元组,所以我无法做到这一点。我觉得有一种方法可以使用,但我无法弄清楚。我可以遍历元组并将其添加到累加器中,但是那样会创建很多永远不会使用的中间元组。我还可以遍历元组,然后遍历元组的元素,并将它们

  • 我的表(表1)中有一条名为“Jonh Wood Doe Smith”的记录,即使用户键入任何可能的组合,我也想返回它:“John Doe”、“Jonn Wood Smith”等 我实现了一个collumn(全名),它是一个包含所有名称的数组,并打算像这样搜索它: 你知道这是否可能和/或解决这类问题的最佳方法是什么吗?我会使用postgresql,所以专有方法、函数等都不是问题。它不需要与其他数据库

  • 我想创建一个方法,它可以具有任意数量的参数和任何数据类型的任何方法。 为。我想动态调用以下所有方法void method1(int x,int y)void method2(int x,String y)void method3(Float x,Long y,String z) 我正在使用反射来这样做。现在我讨论了如何在调用这个动态方法的过程中向这些方法传递参数。我做了多远... 如何调用MyMe

  • 问题内容: 我希望这行JavaScript: 返回类似: 但相反,它仅返回最后捕获的匹配项: 有没有办法获取所有捕获的比赛? 问题答案: 在大多数情况下,当您重复一个捕获组时,仅保留最后一个捕获。以前的任何捕获都将被覆盖。以某种形式,例如.NET,您可以获取所有中间捕获,但是Javascript并非如此。 也就是说,在Javascript中,如果您有一个带有 N个 捕获组的模式,则即使重复了其中一

  • 打扰一下。我还在学习令人惊叹的JQuery语言。我遇到了一个问题,读了很多书,但仍然一团糟。希望你能指导我解决问题。 我有三个函数,它们执行三个post调用。它们返回一个文本变量,我最近将其解析为JSON(如果可用)。 问题是所有这些功能都很有效。。。当然,它们是异步的。我需要: 按顺序执行,就像它们是同步的 除此之外,(这是我的挑战之一)函数a接收到一个param1,但在执行后,代码将param