我正在使用一个外部库,其中包含下面的代码。我正在发送很多命令,并在结果中插入统计信息,以检查有多少次调用失败,有多少次成功
public Future<CommandResult> sendCommand(Command command) {
return command.execute();
}
CommandResult can be success or failure
但是,如果我使用client.sendCommand(命令). get();
那么,我正在同步等待结果,同时应用程序被阻止。
我只想稍后再检查(30秒后,哪个调用成功,哪个调用失败)。我保证在10秒内得到答案。问题在于,应用程序等待计算完成,然后检索其结果。
我在思考这种基于答案的方法:
List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
futures.add(client.sendCommand(command));
}
//in a scheduler, 30+ seconds later
for (Future<Boolean> future : futures) {
saveResult(future.get());
}
如果将< code>Future实例转换为< code>CompletableFuture(参见Panagiotis Bougioukos的回答)是一个选项,那么您可以实现一个简单的助手函数来转换< code >流
public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
return futures
.map(future -> future.thenApply(Stream::of))
.reduce(
CompletableFuture.completedFuture(Stream.empty()),
(future1, future2) ->
future1
.thenCompose(stream1 ->
future2
.thenApply(stream2 ->
concat(stream1, stream2)))
);
}
从本质上讲,这减少了与流的未来平行的期货流。
如果您在字符串的期货流上使用此选项,它将返回一个期货,该期货在单个期货的最后一个期货完成后即完成:
Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);
// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());
Future
是一个遗留的java功能,它不允许反应式非阻塞功能。CompletableFuture
是Java中后来的增强功能,以允许这种反应式非阻塞功能。
您可以基于之前的SO答案尝试将您的未来
转换为CompletableFuture,
然后您将公开利用非阻塞执行的方法。
检查下面的例子,并进行相应的修改。
public class Application {
public static void main(String[] args) throws ParseException {
Future future = new SquareCalculator().calculate(10);
CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
System.out.println("before apply");
completableFuture.thenApply(s -> {
System.out.println(s);
return s;
});
System.out.println("after apply method");
}
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
}
然后类创建一个示例Future
public class SquareCalculator {
private ExecutorService executor
= Executors.newSingleThreadExecutor();
public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}
会导致
我只想稍后再检查(30秒后,哪些调用成功,哪些调用失败)。我保证在10秒内得到答案。问题在于,应用程序等待计算完成,然后检索其结果。
如果您想稍后检查结果,那么您的< code >未来解决方案
如果您想在收到结果时获得结果,我会使用ExecutorCompletionService
,您可以随时轮询该服务,查看是否有结果。投票是非阻塞的。
// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
// do some processing ...
// are any results available?
Boolean result = completionService.poll();
if (result != null) {
// process a result if available
jobsRunning--;
}
}
请注意,您需要跟踪您向CompletionService
提交了多少作业。
如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间
我想使用ExecutorService框架解决一些DNS任务。我使用java API进行DNS查询:1/java.net.InetSocketAddress.InetSocketAddress(String,int)-名称查找2/java.net.InetAddress.getByName(String)-名称查找3/java.net.InetAddress.getHostName()-反向名称查
问题内容: OpenSSL库允许使用SSL_read从基础套接字读取并使用SSL_write对其进行写入。这些函数可能会根据其SSL协议需求(例如,在重新协商连接时),以SSL_ERROR_WANT_READ或SSL_ERROR_WANT_WRITE返回。 我不太了解API希望我如何处理这些结果。 对一个接受客户端连接的服务器应用程序进行映像,建立一个新的ssl会话,使基础套接字成为非阻塞状态,然
Java Future对象用于获取由并行线程(执行器)执行的异步计算的结果。我们调用Future.get()方法并等待结果就绪。此示例显示了一种从Future检索结果的非阻塞方式。java实现java非阻塞未来。 在本例中,在并行执行完成后调用onSuccess()方法。问题在于onSuccess()方法未在主线程上运行。我想在主线程上执行onSuccess()方法。我怎样才能解决这个问题。谢谢
Python 2.7。3 x64 wxPython 2.8 x64 我已经阅读了很多关于python线程和多处理的文章,特别是Doug Hellmann的一些文章,这些文章帮助很大。然而,我对一件事感到困惑。。。 我认为Python多处理模块或多或少是线程模块的替代品,只是args必须是可拾取的,但是我发现为了不阻塞我的GUI,我必须首先使用线程创建一个新线程。线程,然后在该线程内通过多处理进行多
问题内容: 尝试为自己总结这两个概念之间的区别(因为当我看到人们在一句话中同时使用这两个概念时,我感到非常困惑,例如“ Non-blocking async IO”,我试图弄清楚它是做什么的)意思)。 因此,以我的理解,无阻塞IO是操作系统的主要机制,如果有任何可用数据,则该OS处理IO,否则仅返回错误/不执行任何操作。 在异步IO中,您仅提供回调,当数据可用时,系统将通知您的应用程序。 那么,实