当前位置: 首页 > 知识库问答 >
问题:

如何持续向执行者提交任务

程天佑
2023-03-14

我一直在寻找这样一种情况的解决方案:我有一个调用项的哈希集,并且我要将这个集提交给执行器进行并行执行。现在我想只要任何提交的任务完成,我应该能够分配一个新的Callable到Executor。

我尝试了这段代码,但是如果我使用Executor.Invoke,那么Executor将等待直到所有任务完成,如果我使用Executor.Submit,那么任务将按顺序完成。如有任何帮助,我们将不胜感激。

package poc.threading;

import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ConcurrencyPoC_UsingExecutor_GetFreeThread {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        executor();
    }


    public static void executor()
    {
        try{

            ExecutorService ex = Executors.newCachedThreadPool();

            //create a set with all callables task
            HashSet<Callable<Object>> callables = new HashSet<>();
            callables.add(task1());
            callables.add(task2());
            callables.add(task3());
            callables.add(task4());

            //executes all task together but executor waits for completion of all tasks

            List<Future<Object>> fu = ex.invokeAll(callables);
            for(int i=0; i<fu.size(); i++)
            {
                System.out.println(fu.get(i).get() + " , " + Thread.currentThread().getName().toString());
            }

            //executes tasks sequentially 
            for(Callable<Object> task : callables)
            {
                Future<Object> future = ex.submit(task);
                System.out.println(future.get() + " , " + Thread.currentThread().getName().toString());
            }
            ex.shutdownNow();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }


    public static Callable<Object> task1() throws InterruptedException
    {
        return new Callable<Object>() {

            @Override
            public Object call() throws Exception {

                int count = 0;
                while(count < 3)
                {
                    System.out.println("****** SLEEP TASK1 ******* "+count);
                    Thread.sleep(500);
                    count ++;
                }
                return "Sleep Task Of 500 Completed";
            }
        };
    }


    public static Callable<Object> task2() throws InterruptedException
    {
        return new Callable<Object>() {

            @Override
            public Object call() throws Exception {

                int count = 0;
                while(count < 6)
                {
                    System.out.println("****** SLEEP TASK2 ******* "+count);
                    Thread.sleep(300);
                    count ++;
                }
                return "Sleep Task Of 300 Completed";
            }
        };
    }


    public static Callable<Object> task3() throws InterruptedException
    {
        return new Callable<Object>() {

            @Override
            public Object call() throws Exception {

                int count = 0;
                while(count < 2)
                {
                    System.out.println("****** SLEEP TASK3 ******* "+count);
                    Thread.sleep(1000);
                    count ++;
                }
                return "Sleep Task Of 1000 Completed";
            }
        };
    }

    public static Callable<Object> task4() throws InterruptedException
    {
        return new Callable<Object>() {

            @Override
            public Object call() throws Exception {

                int count = 0;
                while(count < 4)
                {
                    System.out.println("****** SLEEP TASK4 ******* "+count);
                    Thread.sleep(600);
                    count ++;
                }
                return "Sleep Task Of 1000 Completed";
            }
        };
    }
}

共有1个答案

叶晋
2023-03-14

在第二个示例中,任务按顺序运行的原因是,在对后续任务调用submit()之前,您要对将来调用get()。如果您在任何get之前完成所有的submits,那么它们将并行运行。

如果您正在寻找相互依赖的任务,请查看CompletableFuture类。这种类型的future将允许您在第一个任务开始后开始另一个任务:

CompletableFuture<Object> task1 = CompletableFuture.supplyAsync(() -> task1(), ex);
CompletableFuture<Object> task2 = task1.thenApplyAsync(task1Result -> task2(), ex);
 类似资料:
  • 要并行或异步运行一些东西,我可以使用ExecutorService:

  • 我正在使用sping-boot,我有这样一个用例,我想将列表的每个元素提交给执行器服务(线程池大小=4)。但是在每个必须处理的元素之间,我想要1秒钟的延迟。 Thread.sleep(1000)不工作,因为执行程序一睡觉就启动另一个线程。 编辑:这是我的process()方法,我在最后尝试使用sleep,但没有成功。

  • 我们在Spring web应用程序中使用预定任务来发送提醒、每日摘要等: 每个调度任务调用一个给定的服务方法(上面伪代码中的fooService.bar())。我想监控每次处决持续多长时间。随着负载、数据或复杂性的增加,其中一些方法可能需要更长的时间。我可以给每个服务方法添加日志记录语句(现在大约有10个,但将来可能会更多),或者使用方面给每个方法添加一些秒表行为。但是,对于spring中的所有预

  • 我有一个2节点的Spark集群,每个节点有4个核心。 null 根据文档: Spark将为集群的每个部分运行一个任务。通常情况下,集群中的每个CPU需要2-4个片。 我将slices设置为8,这意味着工作集将被划分为集群上的8个任务,反过来每个工作节点得到4个任务(每个核心1:1) 我假设理想情况下,我们应该调优,使其与每个节点(在同构集群中)中的核数相对应,以便每个核获得自己的执行器和任务(1:

  • 交付管道的建立和自动化是持续交付的基础 持续集成 更关注代码质量。持续集成是为了确保随着需求变化而变化的代码,在实现功能的同时,质量不受影响。因此,在每一次构建后会运行单元测试,保证代码级的质量。单元测试会针对每一个特定的输入去判断和观察输出的结果,而单元测试的粒度则用来平衡持续集成的质量和速度。 持续集成的核心价值在于1: 持续集成中的任何一个环节都是自动完成的,无需太多的人工干预,有利于减少重

  • 问题内容: 我希望HTML表单提交后不执行任何操作。 这是不好的,因为它会导致页面重新加载。 基本上,我希望每当按下一个按钮或有人在键入数据后点击它时,都将调用Ajax函数。是的,我可以删除表单标签并添加,只需从按钮的onclick事件中调用该函数即可,但我也希望“命中进入”功能而又不至于让人头疼。 问题答案: 通过使用您在“提交”按钮中调用的JavaScript代码,可以停止提交表单。 基本上,