当前位置: 首页 > 知识库问答 >
问题:

如何在Java中实现一个非阻塞的未来处理器,以便以后检索处理过的结果

郎慎之
2023-03-14

我正在使用一个外部库,其中包含下面的代码。我正在发送很多命令,并在结果中插入统计信息,以检查有多少次调用失败,有多少次成功

public Future<CommandResult> sendCommand(Command command) {
    return command.execute();
}
CommandResult can be success or failure

但是,如果我使用client.sendCommand(命令). get();那么,我正在同步等待结果,同时应用程序被阻止。

我只想稍后再检查(30秒后,哪个调用成功,哪个调用失败)。我保证在10秒内得到答案。问题在于,应用程序等待计算完成,然后检索其结果。

我在思考这种基于答案的方法:

List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
   futures.add(client.sendCommand(command)); 
} 

//in a scheduler, 30+ seconds later 
for (Future<Boolean> future : futures) {  
   saveResult(future.get());
} 

共有3个答案

扶绍辉
2023-03-14

如果将< code>Future实例转换为< code>CompletableFuture(参见Panagiotis Bougioukos的回答)是一个选项,那么您可以实现一个简单的助手函数来转换< code >流

public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
    return futures
        .map(future -> future.thenApply(Stream::of))
        .reduce(
            CompletableFuture.completedFuture(Stream.empty()),
            (future1, future2) ->
            future1
                .thenCompose(stream1 ->
            future2
                .thenApply(stream2 ->
            concat(stream1, stream2)))
        );
}

从本质上讲,这减少了与流的未来平行的期货流。

如果您在字符串的期货流上使用此选项,它将返回一个期货,该期货在单个期货的最后一个期货完成后即完成:

Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);

// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());
谭宏盛
2023-03-14

Future是一个遗留的java功能,它不允许反应式非阻塞功能。CompletableFuture是Java中后来的增强功能,以允许这种反应式非阻塞功能。

您可以基于之前的SO答案尝试将您的未来转换为CompletableFuture,然后您将公开利用非阻塞执行的方法。

检查下面的例子,并进行相应的修改。

public class Application {

    public static void main(String[] args) throws ParseException {

        Future future =  new SquareCalculator().calculate(10);
        CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
        System.out.println("before apply");
        completableFuture.thenApply(s -> {
            System.out.println(s);
            return s;
        });
        System.out.println("after apply method");
    }


    public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
        if (future.isDone())
            return transformDoneFuture(future);
        return CompletableFuture.supplyAsync(() -> {
            try {
                if (!future.isDone())
                    awaitFutureIsDoneInForkJoinPool(future);
                return future.get();
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                // Normally, this should never happen inside ForkJoinPool
                Thread.currentThread().interrupt();
                // Add the following statement if the future doesn't have side effects
                // future.cancel(true);
                throw new RuntimeException(e);
            }
        });
    }

    private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
        CompletableFuture<T> cf = new CompletableFuture<>();
        T result;
        try {
            result = future.get();
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
            return cf;
        }
        cf.complete(result);
        return cf;
    }

    private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
            throws InterruptedException {
        ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
            @Override public boolean block() throws InterruptedException {
                try {
                    future.get();
                } catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
                return true;
            }
            @Override public boolean isReleasable() {
                return future.isDone();
            }
        });
    }
}

然后类创建一个示例Future

public class SquareCalculator {

    private ExecutorService executor
            = Executors.newSingleThreadExecutor();

    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

会导致

柳鸿信
2023-03-14

我只想稍后再检查(30秒后,哪些调用成功,哪些调用失败)。我保证在10秒内得到答案。问题在于,应用程序等待计算完成,然后检索其结果。

如果您想稍后检查结果,那么您的< code >未来解决方案

如果您想在收到结果时获得结果,我会使用ExecutorCompletionService,您可以随时轮询该服务,查看是否有结果。投票是非阻塞的。

// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
   new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
   completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
   // do some processing ...
   // are any results available?
   Boolean result = completionService.poll();
   if (result != null) {
      // process a result if available
      jobsRunning--;
   }
}

请注意,您需要跟踪您向CompletionService提交了多少作业。

 类似资料:
  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间

  • 我想使用ExecutorService框架解决一些DNS任务。我使用java API进行DNS查询:1/java.net.InetSocketAddress.InetSocketAddress(String,int)-名称查找2/java.net.InetAddress.getByName(String)-名称查找3/java.net.InetAddress.getHostName()-反向名称查

  • 问题内容: OpenSSL库允许使用SSL_read从基础套接字读取并使用SSL_write对其进行写入。这些函数可能会根据其SSL协议需求(例如,在重新协商连接时),以SSL_ERROR_WANT_READ或SSL_ERROR_WANT_WRITE返回。 我不太了解API希望我如何处理这些结果。 对一个接受客户端连接的服务器应用程序进行映像,建立一个新的ssl会话,使基础套接字成为非阻塞状态,然

  • Java Future对象用于获取由并行线程(执行器)执行的异步计算的结果。我们调用Future.get()方法并等待结果就绪。此示例显示了一种从Future检索结果的非阻塞方式。java实现java非阻塞未来。 在本例中,在并行执行完成后调用onSuccess()方法。问题在于onSuccess()方法未在主线程上运行。我想在主线程上执行onSuccess()方法。我怎样才能解决这个问题。谢谢

  • Python 2.7。3 x64 wxPython 2.8 x64 我已经阅读了很多关于python线程和多处理的文章,特别是Doug Hellmann的一些文章,这些文章帮助很大。然而,我对一件事感到困惑。。。 我认为Python多处理模块或多或少是线程模块的替代品,只是args必须是可拾取的,但是我发现为了不阻塞我的GUI,我必须首先使用线程创建一个新线程。线程,然后在该线程内通过多处理进行多

  • 问题内容: 尝试为自己总结这两个概念之间的区别(因为当我看到人们在一句话中同时使用这两个概念时,我感到非常困惑,例如“ Non-blocking async IO”,我试图弄清楚它是做什么的)意思)。 因此,以我的理解,无阻塞IO是操作系统的主要机制,如果有任何可用数据,则该OS处理IO,否则仅返回错误/不执行任何操作。 在异步IO中,您仅提供回调,当数据可用时,系统将通知您的应用程序。 那么,实