当前位置: 首页 > 知识库问答 >
问题:

完成未来与超时不工作

敖涵容
2023-03-14

我是新的完全未来。我试图为元素列表(即参数)调用并行方法,然后将结果组合起来创建最终响应。我还试图设置50毫秒的超时,以便如果调用不返回50毫秒,我将返回默认值。

到目前为止,我已经尝试过:

    {

     List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());

    try {
     List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
     .map(inPutItemGroup -> 
       CompletableFuture.runAsync(() -> {
           final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup);               //call which I am tryin to make parallel

           // this is thread safe
           if (null != itemGroup) {
                  result.add(itemGroup); //output of the call
           }
        }, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup))  //this line throws error     
     .collect(Collectors.toList());

// this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
                        .join();
} catch (final Throwable t) {
     final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
                log.error(errorMsg, e);
                result = response.getItemGroupList(); //default value - return the input value if error
    }

    Response finalResponse = Response.builder()
                    .itemGroupList(result)
                    .build();

    }

     private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
            CompletableFuture<T> result = new CompletableFuture<T>();

            //Threadpool with 1 thread for scheduling a future that completes after a timeout
            ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
            String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
            delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
            return result;
     }

但我一直得到错误说:

 error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
    [javac]                             itemGroup))

incompatible types: inference variable T has incompatible bounds
    [javac]                     .collect(Collectors.toList());
    [javac]                             ^
    [javac]     equality constraints: CompletableFuture
    [javac]     lower bounds: Object
    [javac]   where T is a type-variable:

有人能告诉我我在这里做错了什么吗?如果我走错了方向,请纠正我。

谢谢

共有2个答案

别峻
2023-03-14

acceptor的签名如下所示:

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, 
    Consumer<? super T> action
) {

抛出错误的一行看起来像这样:

.acceptEither(
     timeoutAfter(50, TimeUnit.MILLISECONDS),
     inPutItemGroup
)

因此,您可以看到您试图将一个ItemGroup作为一个消费者传递

error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>

令狐钧
2023-03-14

而不是

acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))

你需要

applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)

编译代码。“接受”是使用值而不返回新值的操作,“应用”是生成新值的操作。

然而,仍然有一个逻辑错误。由timeoutAfter返回的未来将异常完成,因此依赖阶段也将异常完成,而不计算函数,因此这种链接方法不适合用默认值替换异常。

更糟糕的是,修复此问题将创建一个新的未来,该未来将由任一源未来完成,但这不会影响结果。添加(itemGroup)在一个源目录中执行的操作。在代码中,生成的未来只用于等待完成,而不用于评估结果。因此,当超时时间过去时,您将停止等待完成,而仍然可能有后台线程修改列表。

正确的逻辑是将获取值的步骤和将结果(获取的值或默认值)添加到结果列表的步骤分开,前者可能在超时时被默认值取代。然后,您可以等待所有add操作的完成。在超时时,可能仍有正在进行的getUpdatedItemGroup计算(无法停止它们的执行),但它们的结果将被忽略,因此不会影响结果列表。

还值得指出的是,为每个列表元素创建一个新的schduledExecutorService(使用后不会关闭,更糟糕的是)不是正确的方法。

// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
    CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
    .map(inPutItemGroup ->
        timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
            CompletableFuture.supplyAsync(
                () -> getUpdatedItemGroup(inPutItemGroup), executorService),
            inPutItemGroup)
         .thenAccept(itemGroup -> {
            // this is thread safe, but questionable,
            // e.g. the result list order is not maintained
            if(null != itemGroup) result.add(itemGroup);
         })
    )
    .toArray(CompletableFuture<?>[]::new);

    // this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
    String errorMsg = String.format("Exception occurred while executing parallel call");
    log.error(errorMsg, e);
    endResult = response.getItemGroupList();
}
finally {
    delayer.shutdown();
}

Response finalResponse = Response.builder()
    .itemGroupList(endResult)
    .build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
    long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {

    es.schedule(() -> f.complete(value), timeout, unit);
    return f;
}

在这里,Supply yAsync生成一个CompletableFuture,它将提供getUpdatedItemGroup计算的结果。调用将在超时后安排一个带有默认值的完成,而不会创建一个新的未来,然后,通过thenAccess链接的依赖操作将把结果值添加到结果列表中。

请注意,synizedList允许从多个线程添加元素,但是从多个线程添加将导致不可预测的顺序,与源列表的顺序无关。

 类似资料:
  • 问题内容: 我尽力而为,但没有找到任何文章和博客可以清楚地比较和,并且提供了很好的分析。 因此,如果任何人都可以向我解释或指向这样的博客或文章,那对我来说真的非常好。 问题答案: 无论 ListenableFuture 和 CompletableFuture 有超过它的父类的优势 未来 通过允许呼叫者在这样或那样的回调“注册”当异步动作已经完成被调用。 使用 Future, 您可以执行以下操作:

  • 我正在使用Java8,我想知道对3个异步作业强制超时的推荐方法,我将执行异步并从未来检索结果。请注意,所有3个作业的超时是相同的。如果超出时间限制,我还想取消作业。 我在想这样的事情: 像这样的东西有用吗?有更好的方法吗? 如何正确地取消未来?Javadoc说,线程不能被中断?所以,如果我取消一个未来,并调用,我会立即得到结果,因为线程不会被中断吗? 在等待结束后,是否建议使用run()或get(

  • 问题内容: 一定时间后如何使承诺超时?我知道Q有一个Promise超时,但是我使用的是本机NodeJS Promise,它们没有.timeout函数。 我是否想念一个或包裹不同的包裹? 或者,下面的实现在不占用内存的情况下是否很好,实际上按预期方式工作? 还可以以某种方式将其包装到全局中,以便将其用于我创建的每个promise,而不必重复setTimeout和clearTimeout代码吗? 谢谢

  • 我无法使用截击库。我不想无限期地等待请求,所以我不想设置超时。但它不起作用。我在其他地方也有同样的东西(我使用RequestFuture而不是RequestFuture),它运行良好,但在这里我无法将其设置为工作状态。 如果你能提供任何帮助,那就太棒了!谢谢

  • 如果一个字段在可完成的未来代码中为空,我必须发送一个异常: 这个想法是,如果孩子的字段为空(在本例中为lastName),我必须抛出一个自定义异常,我不太确定如何实现这一点。 我的想法是使用thenAccept方法发送异常,如下所示: 我必须评估数据库中的lastName是否为空,我必须抛出一个异常。 有什么想法吗?

  • 主功能仅输出1、2和3。为什么run中的Runnable不能同时运行和打印4? 但是,当我改变doWhat函数如下所示,所有的数字1,2,3和4将被打印。