当前位置: 首页 > 知识库问答 >
问题:

在Java中并行执行依赖任务

拓拔坚
2023-03-14

我需要找到一种在java中并行执行任务(依赖和独立)的方法。

  1. 任务A和任务C可以独立运行
  2. 任务B取决于任务A的输出

我查了java。util。并发Future和Fork/Join,但看起来我们无法向任务添加依赖项。

有人能告诉我正确的Java API吗。

共有3个答案

陆翰学
2023-03-14

使用阻塞队列。将任务A的输出放入队列,任务B阻塞,直到队列中有可用的内容。

文档包含实现此目的的示例代码:http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

慕弘深
2023-03-14

具有依赖关系的任务的通用编程模型是数据流。简化模型,其中每个任务只有一个依赖项(尽管重复),这是参与者模型。Java有很多actor库,但用于数据流的很少。另请参见:java的哪种参与者模型库框架,嵌套回调的java模式

潘嘉佑
2023-03-14

在Scala中,这很容易做到,我认为最好使用Scala。下面是我从这里得到的一个例子http://danielwestheide.com/(Scala新手指南第16部分:从这里去哪里)这家伙有一个很棒的博客(我不是那个家伙)

让我们找一位做咖啡的律师。要做的任务是:

  1. 研磨所需的咖啡豆(无前述任务)
  2. 加热一些水(没有之前的任务)
  3. 用磨碎的咖啡和热水煮一杯浓咖啡(取决于1)

或者像一棵树:

Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk

在使用并发api的java中,这将是:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Barrista {

    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }

    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }

    static class Brew implements Callable<String> {

        final Future<String> grindedBeans;
        final Future<String> hotWater;

        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }

        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }

    static class FrothMilk implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }

    static class Combine implements Callable<String> {

        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }

        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }

    }

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));

        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);


        try {

            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

但要确保添加了超时,以确保代码不会永远等待某件事情完成,这是通过使用未来完成的。获取(long,TimeUnit),然后相应地处理故障。

它在scala更好,但是,这里就像它在博客上:准备一些咖啡的代码看起来像这样:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

例如,所有方法都返回一个未来(类型化的未来),grind是这样的:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

对于所有的实现,请查看博客,但仅此而已。您还可以轻松地集成Scala和Java。我真的建议用Scala而不是Java来做这类事情。Scala需要更少的代码、更干净和事件驱动。

 类似资料:
  • 问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如

  • 我试图通过这个Gradle插件https://github.com/theboegl/gradle-launch4j使用http://launch4j.sourceforge.net/。 当我执行时,我会得到以下输出。 这是我的年级版本信息。 这是我的构建脚本。

  • 我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回

  • 问题内容: 我正在编写一个新的Jenkins管道,并具有一组最终要并行运行的步骤。但是,在开发此管道时,我想强制其顺序运行。我没有看到任何指定并行步骤使用的线程数或类似方法的方法。这是到目前为止的基本代码: 我希望能够依次运行这些Shell脚本而无需更改很多代码。 问题答案: 而不是您可以这样使用:

  • 假设我有几个任务要在Java中并行运行。每个任务要么返回成功,要么返回失败。每个任务都有一个相关的截止日期。如果任务未在截止日期前完成,它将被中断(所有任务都可中断)并返回失败。 如果其中一个任务失败(即返回失败),我们将中断所有仍在运行的其他任务。 我们应该等到所有任务都完成,最后如果所有任务都返回成功,则返回成功;如果至少有一个任务返回失败,则返回失败。 你将如何实施它?我将使用util。同时

  • 问题内容: 我有一个使用外部jar的应用程序。我使用了eclipse,效果很好。我从eclipse导出为jar,创建了一个清单文件,该文件具有Class- Path:./cab.v1.jar,我将两个jar都放在了同一目录中。我在命令行中运行:java -jar myApp.jar 并为cab.v1.jar(另一个jar)中的类获取 java.lang.NoClassDefFoundError 也