当前位置: 首页 > 知识库问答 >
问题:

使用CompletableFuture执行单个或多个可调用项并避免阻塞

夏嘉德
2023-03-14

我通过ThreadPoolExecutor执行一些可调用项。如果线程列表只包含1个可调用的线程,那么我直接调用我的CallableServicecall方法。若列表包含超过1个可调用项,那个么我将通过线程池执行器并行执行所有这些线程。

我如何在Java8 CompletableFuture中实现这一点?如果增强了future.get()以避免阻塞,这将是一个加号。

private static ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(0, 100, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());

public static void execute(List<Callable<Boolean>> threadList) throws Exception {

    List<Future<Boolean>> futureList = null;
    CallableService singleService = (CallableService) threadList.get(0);
    if (1 == threadList.size()) {
        singleService.call();
    }
    else {
        try {
            futureList = myThreadPoolExecutor.invokeAll(threadList);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    if (null != futureList) {
        for (Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch (Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

共有3个答案

施令秋
2023-03-14

我通过线程池执行几个可调用的。如果线程列表只包含1个可调用的,那么我直接调用我的CallableService的调用方法。如果列表包含超过1个可调用的,那么我通过线程池执行器并行执行所有这些线程。

我想你已经实现了这个部分。(如果作业繁重,并且有100个线程按配置运行,则可能会遇到内存使用问题。但这是另一个问题。)

如果future.get()被增强以避免阻塞,那将是一个优点。

为此,您可以采取以下方法:

>

  • 创建另一个ExecutorService,其工作只是运行Future.get()调用。
  • 提交您的Future.get()到该服务,如下所示。
  • 关闭它,等待终止。

    if (null != futureList) {
        ExecutorService waitSvc = Executors.newCachedThreadPool();
        for (Future<Boolean> future : futureList) {
            try {
                waitSvc.submit( () -> future.get() );
            }
            catch (Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
        waitSvc.shutdown(); //This may take some time. You may want to call awaitTermination() after this.
    }
    

    然而,我觉得你应该重新设计使用这么多线程的整体方法,除非这只是一个用于学习的应用程序。

  • 蓬森
    2023-03-14

    Future.isDone()告诉我们执行器是否完成了任务的处理。如果任务完成,则返回true,否则返回false。

     for (Future<Boolean> future : futureList) {
       while(!future.isDone()) 
       {
              doSOmethingElse();
              Thread.sleep(300);//Optional
        }
    	try {
                    future.get();
            }
        catch (Exception e) 
    	{
                    //do some calculations here and then throw exception
                    throw new Exception(e.getMessage(), e);
        }
    }
    东门宜
    2023-03-14

    不需要CompletableFuture,因为您使用ExecutorService的方式就足够了,但是,代码流的某些方面可以改进。即使不需要,也可以获取第一个元素,并将其无缘无故地转换为CallableService,因为您已经可以通过Callable接口调用该方法。在另一个分支中,您正在捕获InterruptedExc0019并继续执行,因此调用方永远不会知道并非所有作业都已执行。在直接的代码流中,您不需要检查列表中的null

    public static void execute(List<Callable<Boolean>> threadList) throws Exception {
        if(1 == threadList.size()) {
            Callable<Boolean> singleService = threadList.get(0);
            singleService.call();
        }
        else {
            List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
            for(Future<Boolean> future : futureList) {
                try {
                    future.get();
                }
                catch(Exception e) {
                    //do some calculations here and then throw exception
                    throw new Exception(e.getMessage(), e);
                }
            }
        }
    }
    

    你可以把它进一步缩短到

    public static void execute(List<Callable<Boolean>> threadList) throws Exception {
        if(1 == threadList.size()) {
            threadList.get(0).call();
        }
        else {
            for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
                try {
                    future.get();
                }
                catch(Exception e) {
                    //do some calculations here and then throw exception
                    throw new Exception(e.getMessage(), e);
                }
            }
        }
    }
    

    但这是一个首选编码风格的问题。但请注意,我注意到,在单元素情况下,您没有执行相同的异常处理。

    要使用CompletableFuture,我们需要一个适配器方法,因为便利方法supplyAsync需要一个Supplier而不是可调用的。使用这个答案的一个修改变量,我们得到

    public static void execute(List<Callable<Boolean>> threadList) throws Exception {
        if(1 == threadList.size()) {
            threadList.get(0).call();
        }
        else {
            CompletableFuture<?> all = CompletableFuture.allOf(
                threadList.stream()
                    .map(c -> callAsync(c, myThreadPoolExecutor))
                    .toArray(CompletableFuture<?>[]::new));
            try {
                all.get();
            }
            catch(Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
    public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
        CompletableFuture<R> cf = new CompletableFuture<>();
        CompletableFuture.runAsync(() -> {
            try { cf.complete(callable.call()); }
            catch(Throwable ex) { cf.completeExceptionally(ex); }
        }, e);
        return cf;
    }
    

    所以我们没有处理提交所有作业的调用。我们必须手动执行此操作,可以使用循环操作,也可以使用流操作。另一方面,我们通过表示完成状态的allOf获得单个未来,如果至少有一个作业失败,则为例外。

    与等待完成的invokeAll不同,allOf只返回未来,因此等待完成的是all.get()调用。在此之前,我们可以执行其他操作,甚至可以使用此属性始终执行调用线程中的第一个作业:

    public static void execute(List<Callable<Boolean>> threadList) throws Exception {
        CompletableFuture<?> tail = CompletableFuture.allOf(
            threadList.stream().skip(1)
                .map(c -> callAsync(c, myThreadPoolExecutor))
                .toArray(CompletableFuture<?>[]::new)),
            head = callAsync(threadList.get(0), Runnable::run);
        try {
            head.get();
            tail.get();
        }
        catch(Exception e) {
            //do some calculations here and then throw exception
            throw new Exception(e.getMessage(), e);
        }
    }
    

    这将始终调用当前线程中的第一个可调用的,因为Runnable::run将在调用线程中立即执行该操作。但是它在所有其他方面都被统一处理,尤其是异常处理。当只有一个作业时,带有空数组的allOf调用将什么都不做,并返回一个已经完成的未来,这将具有所需的效果。

     类似资料:
    • 我将以下响应返回给用户 到目前为止,我正在进行三次连续调用来计算这个,每个调用都可以独立于其他调用运行。我尝试制作三种不同的作为: 如果我做了

    • 我试图从父组件按钮单击调用每个子组件函数。尝试使用道具并在componentDidMount函数中将按钮单击指定给新函数,但结果是只有最后一个函数警报(“child2保存”);在按钮点击时被分配和执行,我期望警报(“child1保存”);也将被执行。 如何在父组件上单击要调用的每个子函数按钮,注意子组件在父组件内部有相当大的嵌套级别。

    • 我需要执行一些任务。有些任务是独立的,有些任务依赖于其他任务的成功执行。独立任务可以并行运行以获得更好的性能。我把这些任务称为服务。列说明哪些服务将以串联方式执行,哪些服务将以并联方式执行。列描述了一组定义的服务所遵循的执行顺序。例如,服务A和B应该并行运行。如果它们已成功执行,则将执行服务C。请注意,服务C并不直接依赖于其先前服务的输出,但它必须在成功执行其先前服务后运行,因为服务C在执行期间需

    • 问题内容: 我有多个包含要发送的消息的BlockingQueues。消费者数量可能少于队列数量吗?我不想遍历队列并继续轮询它们(忙于等待),并且我不想为每个队列都分配一个线程。相反,当任何队列中都有消息可用时,我想唤醒一个线程。 问题答案: 您可以做的一个技巧是让队列排队。因此,您要做的是只有一个阻塞队列,所有线程都订阅该队列。然后,当您将某些内容排队到其中一个BlockingQueues中时,您

    • 我正在使用 Kubernetes 作为容器编排器构建一个微服务应用程序。该应用程序现已启动并运行,但我有其他问题。那是在我的服务中,我每天都有一个计划任务运行,当服务部署时,将运行多个服务实例(通过设置副本编号),创建多个同时运行的任务。我期望的是只有一个服务任务实例将运行,而不是多个实例。有什么技术可以处理这种情况吗? 库伯内特斯 Asp.net核心构建微服务 CI/CD的基岩实现 Fabrik

    • 我正在使用Python(IPython 我有一个3000个唯一ID的数组,可以从API中提取数据,一次只能使用一个ID调用API。 我希望以某种方式同时拨打3组1000个电话,以加快速度。 最好的方法是什么? 提前感谢任何帮助!