我有一个方法(包含在下面)来返回completablefuture
列表的值。
该方法应该是:
第一个点工作得很好,在它超过超时限制后确实会弹出。(之后我仍然需要调用exectuorservice.shutdownNow()
以返回给调用方)。我遇到的问题是我要完成的第二件事。
假设我有一个20,000个期货的列表,所有的期货都有一个例外,那么为什么要让所有的期货都执行,如果我看到有太多的例外,那么我假设所有的期货都有问题,我想取消它们。
此外,我希望有一个超时的每一个未来的个人可能需要多长时间,但这也不会工作,谦虚的原因概述如下。
原因似乎是,当我调用AllDoNefuture.ThenApply()
时,此时它会等待并让所有的未来完成,或者成功完成,或者异常完成。只有在所有这些都完成之后,它才会穿越每一个未来,并获得它的结果。在这一点上,取消它有什么好处,当他们已经完成。
如果有人能告诉我如何完成这个特定的需求,我将非常感激:“监视异常和个别超时,并在此基础上取消所有其他超时”。
多谢了。
下面是我写的方法:
/**
* @param futures a list of completable futures
* @param timeout how long to allow the futures to run before throwing exception
* @param timeUnit unit of timeout
* @param allowedExceptions how many of the futures do we tolerate exceptions,
* NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
* */
public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
CompletableFuture<Void> allDoneFuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
AtomicInteger exceptionCount = new AtomicInteger(0);
return allDoneFuture.thenApply(v ->//when all are done
futures.stream().
html" target="_blank">map(future -> {
try {
//if only I could set an individual timeout
return future.get(timeout, timeUnit);
} catch (Exception e) {
future.cancel(true);
int curExceptionCnt = exceptionCount.incrementAndGet();
if(curExceptionCnt >= allowedExceptions){
//I would've hoped that it will throw it to the calling try-catch
//and then cancel all futures, but it doesn't
throw new RuntimeException(e);
}
else{
return null;
}
}
}).
collect(Collectors.<T>toList())
).get(timeout, timeUnit);
} catch (Exception e) {
allDoneFuture.cancel(true);
throw new RuntimeException(e);
}
}
要在一定数量的异常之后取消所有剩余的期货,您可以对每个异常调用exceptionary
,增加异常计数,并可能在其中取消它们。
对于单独的超时,您可以创建一个类,使用它的超时来保存未来,然后根据超时对它们进行排序,并调用get
并使用超时减去经过的时间。
static class FutureWithTimeout<T> {
CompletableFuture<T> f;
long timeout;
TimeUnit timeUnit;
FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
this.f = f;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
AtomicInteger exceptionCount = new AtomicInteger(0);
futures.forEach(f -> f.f.exceptionally(t -> {
if(exceptionCount.getAndIncrement() == allowedExceptions){
futures.forEach(c -> c.f.cancel(false));
}
return null;
}));
long t = System.nanoTime();
return futures.stream()
.sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
.map(f -> {
try {
return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)),
TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
f.f.cancel(false);
return null;
}
})
.collect(Collectors.toList());
}
请注意,这可能会以与传入的顺序不同的顺序返回列表。如果您需要相同的顺序,那么您可以将map().collection()
更改为foreachordered
,然后在不排序的情况下将它们重新映射到它们的结果中。
此外,cancel
的mayinterruptifrunning
参数对completablefuture
没有影响,因此我将其更改为false。
问题内容: 我有 如果正在使用entitymanager,如何检索会话?如何从分离标准中获取结果? 问题答案: 为了完全详尽无遗,如果您使用的是JPA 1.0或JPA 2.0实现,则情况有所不同。 JPA 1.0 对于JPA 1.0,您必须使用。但是请记住, 此方法的结果是特定 于 实现的, 即从使用Hibernate的应用程序服务器到其他服务器之间不可移植。例如,使用JBoss,您可以执行以下操
我想构建一个简单的消费者程序(java ),以获取ActiveMQ主题中存储的所有消息。我有一个在队列中发送文本消息的生成器。 但我不知道如何开始写我的消费者来检索旧消息并等待新消息。 如果你有一个例子,谢谢! 这是我的制片人:http://pastebin.com/uRy9D8mY 这是我的消费者:http://pastebin.com/bZh4r66e 当我先于消费者运行生产者,然后运行消费者
问题内容: 我遇到了这个问题,我不知道需要迭代其实际值的实际类型。 任何想法如何从enumValue中提取其可能的值? 问题答案:
如何取消/中止angular 4中所有挂起的HTTP请求。 有一个取消HTTP请求的方法,但是如何一次取消所有挂起的请求。 尤其是在改变路线的时候。 我做了一件事 但是如何在全球范围内实现这一点 有什么想法吗?
问题内容: 是否有一种快速的方法可以从中的所有表获取所有列名,而不必列出所有表? 问题答案:
问题内容: 我今天一直在测试Javascript CSS函数,并注意到当使用.style.cssText时,它仅提供了我用JS设置的CSS。 相反,我想获取元素的所有CSS,所以我猜我在做错什么,或者可能需要另一个函数,例如getComputedStyle,但要使用整个CSS而不是单个属性值,但是我找不到搜索时需要的东西。 所以我的问题是如何从代码的最后部分获得完整的CSS,例如: 而不是当前输出