当前位置: 首页 > 知识库问答 >
问题:

任务有向无环图的并行执行

戚建华
2023-03-14

我有一个任务列表[Task-a,Task-B,Task.C,Task-D,…]
一个任务可以选择性地依赖于其他任务。

例如:
A可以依赖于3个任务:B、C和D
B可以依赖于2个任务:C和E

它基本上是一个有向无环图,任务的执行应该只有在依赖任务执行之后才能发生。

现在,在任何时间点,都可能有多个任务可供执行。在这种情况下,我们可以并行运行它们。

关于如何在尽可能多的并行性的同时实现这样的执行,您有什么想法吗?

class Task{
     private String name;
     private List<Task> dependentTasks;
     
     public void run(){
     // business logic
     }
}

共有2个答案

易元青
2023-03-14

我们可以创建一个 DAG,其中图形的每个顶点是任务之一。
之后,我们可以计算其拓扑排序顺序。
然后,我们可以用优先级字段修饰 Task 类,并使用优先级阻止队列运行 ThreadPoolExecutor,该队列使用优先级字段比较任务。

最后一个技巧是覆盖run()以首先等待所有依赖任务完成。

由于每个任务无限期地等待其相关任务完成,我们不能让线程池完全被拓扑排序顺序较高的任务占据;线程池将永远卡住<为了避免这种情况,我们只需要根据拓扑顺序为任务分配优先级。

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class Testing {

  private static Callable<Void> getCallable(String taskId){
    return () -> {
      System.out.println(String.format("Task %s result", taskId));
      Thread.sleep(100);
      return null;
    };
  }

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Callable<Void> taskA = getCallable("A");
    Callable<Void> taskB = getCallable("B");
    Callable<Void> taskC = getCallable("C");
    Callable<Void> taskD = getCallable("D");
    Callable<Void> taskE = getCallable("E");
    PrioritizedFutureTask<Void> pfTaskA = new PrioritizedFutureTask<>(taskA);
    PrioritizedFutureTask<Void> pfTaskB = new PrioritizedFutureTask<>(taskB);
    PrioritizedFutureTask<Void> pfTaskC = new PrioritizedFutureTask<>(taskC);
    PrioritizedFutureTask<Void> pfTaskD = new PrioritizedFutureTask<>(taskD);
    PrioritizedFutureTask<Void> pfTaskE = new PrioritizedFutureTask<>(taskE);
    // Create a DAG graph.
    pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE);
    pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD);
    // Now that we have a graph, we can just get its topological sorted order.
    List<PrioritizedFutureTask<Void>> topological_sort = new ArrayList<>();
    topological_sort.add(pfTaskE);
    topological_sort.add(pfTaskC);
    topological_sort.add(pfTaskB);
    topological_sort.add(pfTaskD);
    topological_sort.add(pfTaskA);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
        new PriorityBlockingQueue<Runnable>(1, new CustomRunnableComparator()));
    // Its important to insert the tasks in the topological sorted order, otherwise its possible that the thread pool will be stuck forever.
    for (int i = 0; i < topological_sort.size(); i++) {
      PrioritizedFutureTask<Void> pfTask = topological_sort.get(i);
      pfTask.setPriority(i);
      // The lower the priority, the sooner it will run.
      executor.execute(pfTask);
    }
  }
}

class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {

  private Integer _priority = 0;
  private final Callable<T> callable;
  private final List<PrioritizedFutureTask> _dependencies = new ArrayList<>();
  ;

  public PrioritizedFutureTask(Callable<T> callable) {
    super(callable);
    this.callable = callable;
  }

  public PrioritizedFutureTask(Callable<T> callable, Integer priority) {
    this(callable);
    _priority = priority;
  }

  public Integer getPriority() {
    return _priority;
  }

  public PrioritizedFutureTask<T> setPriority(Integer priority) {
    _priority = priority;
    return this;
  }

  public PrioritizedFutureTask<T> addDependency(PrioritizedFutureTask dep) {
    this._dependencies.add(dep);
    return this;
  }

  @Override
  public void run() {
    for (PrioritizedFutureTask dep : _dependencies) {
      try {
        dep.get();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    super.run();
  }

  @Override
  public int compareTo(PrioritizedFutureTask<T> other) {
    if (other == null) {
      throw new NullPointerException();
    }
    return getPriority().compareTo(other.getPriority());
  }
}

class CustomRunnableComparator implements Comparator<Runnable> {
  @Override
  public int compare(Runnable task1, Runnable task2) {
    return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2);
  }
}

输出:

Task E result
Task C result
Task B result
Task D result
Task A result

PS:这是Python中经过良好测试的简单拓扑排序实现,您可以轻松地将其移植到Java。

宗波涛
2023-03-14

另一个答案很好,但太复杂了。

一种更简单的方法是并行执行卡恩的算法。

关键是并行执行已执行所有依赖项的所有任务。

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


class DependencyManager {
private final ConcurrentHashMap<String, List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final  AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);

private static Runnable getRunnable(DependencyManager dependencyManager, String taskId){
    return () -> {
    try {
        Thread.sleep(2000);  // A task takes 2 seconds to finish.
        dependencyManager.taskCompleted(taskId);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    };
}

/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
    _dependencies.putIfAbsent(taskId, new ArrayList<>());
    _reverseDependencies.putIfAbsent(taskId, new ArrayList<>());
    _tasks.putIfAbsent(taskId, getRunnable(this, taskId));
    _numDependenciesExecuted.putIfAbsent(taskId, 0);
}

private void addEdge(String dependentTaskId, String dependeeTaskId) {
    _dependencies.get(dependentTaskId).add(dependeeTaskId);
    _reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}

public void addDependency(String dependentTaskId, String dependeeTaskId) {
    addVertex(dependentTaskId);
    addVertex(dependeeTaskId);
    addEdge(dependentTaskId, dependeeTaskId);
}

private void taskCompleted(String taskId) {
    System.out.println(String.format("%s:: Task %s done!!", Instant.now(), taskId));
    _numTasksExecuted.incrementAndGet();
    _reverseDependencies.get(taskId).forEach(nextTaskId -> {
        _numDependenciesExecuted.computeIfPresent(nextTaskId, (__, currValue) -> currValue + 1);
        int numDependencies = _dependencies.get(nextTaskId).size();
        int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
        if (numDependenciesExecuted == numDependencies) {
        // All dependencies have been executed, so we can submit this task to the threadpool. 
            _executorService.submit(_tasks.get(nextTaskId));
        }
        });
    if (_numTasksExecuted.get() == _tasks.size()) {
        topoSortCompleted();
    }
}

private void topoSortCompleted() {
    System.out.println("Topo sort complete!!");
    _executorService.shutdownNow();
}

public void executeTopoSort() {
    System.out.println(String.format("%s:: Topo sort started!!", Instant.now()));
    _dependencies.forEach((taskId, dependencies) -> {
    if (dependencies.isEmpty()) {
        _executorService.submit(_tasks.get(taskId));
    }
    });
}
}

public class TestParallelTopoSort {

public static void main(String[] args) {
    DependencyManager dependencyManager = new DependencyManager();
    dependencyManager.addDependency("8", "5");
    dependencyManager.addDependency("7", "5");
    dependencyManager.addDependency("7", "6");
    dependencyManager.addDependency("6", "3");
    dependencyManager.addDependency("6", "4");
    dependencyManager.addDependency("5", "1");
    dependencyManager.addDependency("5", "2");
    dependencyManager.addDependency("5", "3");
    dependencyManager.addDependency("4", "1");
    dependencyManager.executeTopoSort();
    // Parallel version takes 8 seconds to execute.
    // Serial version would have taken 16 seconds.

}
}

此示例中构造的有向无环图是这样的:

 类似资料:
  • 问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如

  • 考虑以下无向非循环图: 如果我们定义“根”为A和E,有没有算法可以确定产生的有向无环图?: 我考虑过从根开始尝试某种DFS或BFS,但我不确定如何处理“等待”的需要,以查看另一个根是否可能到达给定的节点。

  • 我有一个任务类,它在执行前依赖于其他任务。我想将可以并行化的任务分组并排序。我决定它可以首先表示为DAG,并尝试使用JGrapht。首先,我遍历任务的输入列表,以获取所有具有依赖关系的任务,并将它们收集在一个列表中。然后,对于每个任务,我在图中创建一个顶点。 然后使用相同的列表,我试图创建节点之间的边。 然后我试着把任务分组 因此,结果是有序和分组的任务,例如图 结果将是 然而,当也是的前身时,它

  • 在图论中,如果一个有向图从任意顶点出发无法经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。 因为有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。 一、简介 有向无环图是图论的重要概念,我们将首先介绍图的概念和定义,随后介绍有向图,再逐渐引至有向无环图(DAG)。值得一提的是,当DAG用于指代模型时一般指向贝叶斯网络。 一个图G

  • 我想写一个Gradle任务,在我所有的子项目中共享。此任务在调用它的子项目中查找所有其他类型为“GenerateMavenPom”的任务,并执行这些任务。 通过这样做,我的子项目可以定义他们想要的任何Maven发布,我可以使用“gradle GenerateMavenPomFiles”等单个任务执行gradle来创建pom.xml,而不需要知道每个子项目中的单个发布类型。为什么?因为Maven插件

  • 我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回