我有一个任务列表[Task-a,Task-B,Task.C,Task-D,…]
一个任务可以选择性地依赖于其他任务。
例如:
A可以依赖于3个任务:B、C和D
B可以依赖于2个任务:C和E
它基本上是一个有向无环图,任务的执行应该只有在依赖任务执行之后才能发生。
现在,在任何时间点,都可能有多个任务可供执行。在这种情况下,我们可以并行运行它们。
关于如何在尽可能多的并行性的同时实现这样的执行,您有什么想法吗?
class Task{
private String name;
private List<Task> dependentTasks;
public void run(){
// business logic
}
}
我们可以创建一个 DAG,其中图形的每个顶点是任务之一。
之后,我们可以计算其拓扑排序顺序。
然后,我们可以用优先级字段修饰 Task 类,并使用优先级阻止队列
运行 ThreadPoolExecutor
,该队列使用优先级字段比较任务。
最后一个技巧是覆盖run()
以首先等待所有依赖任务完成。
由于每个任务无限期地等待其相关任务完成,我们不能让线程池完全被拓扑排序顺序较高的任务占据;线程池将永远卡住<为了避免这种情况,我们只需要根据拓扑顺序为任务分配优先级。
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Testing {
private static Callable<Void> getCallable(String taskId){
return () -> {
System.out.println(String.format("Task %s result", taskId));
Thread.sleep(100);
return null;
};
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Void> taskA = getCallable("A");
Callable<Void> taskB = getCallable("B");
Callable<Void> taskC = getCallable("C");
Callable<Void> taskD = getCallable("D");
Callable<Void> taskE = getCallable("E");
PrioritizedFutureTask<Void> pfTaskA = new PrioritizedFutureTask<>(taskA);
PrioritizedFutureTask<Void> pfTaskB = new PrioritizedFutureTask<>(taskB);
PrioritizedFutureTask<Void> pfTaskC = new PrioritizedFutureTask<>(taskC);
PrioritizedFutureTask<Void> pfTaskD = new PrioritizedFutureTask<>(taskD);
PrioritizedFutureTask<Void> pfTaskE = new PrioritizedFutureTask<>(taskE);
// Create a DAG graph.
pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE);
pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD);
// Now that we have a graph, we can just get its topological sorted order.
List<PrioritizedFutureTask<Void>> topological_sort = new ArrayList<>();
topological_sort.add(pfTaskE);
topological_sort.add(pfTaskC);
topological_sort.add(pfTaskB);
topological_sort.add(pfTaskD);
topological_sort.add(pfTaskA);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(1, new CustomRunnableComparator()));
// Its important to insert the tasks in the topological sorted order, otherwise its possible that the thread pool will be stuck forever.
for (int i = 0; i < topological_sort.size(); i++) {
PrioritizedFutureTask<Void> pfTask = topological_sort.get(i);
pfTask.setPriority(i);
// The lower the priority, the sooner it will run.
executor.execute(pfTask);
}
}
}
class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {
private Integer _priority = 0;
private final Callable<T> callable;
private final List<PrioritizedFutureTask> _dependencies = new ArrayList<>();
;
public PrioritizedFutureTask(Callable<T> callable) {
super(callable);
this.callable = callable;
}
public PrioritizedFutureTask(Callable<T> callable, Integer priority) {
this(callable);
_priority = priority;
}
public Integer getPriority() {
return _priority;
}
public PrioritizedFutureTask<T> setPriority(Integer priority) {
_priority = priority;
return this;
}
public PrioritizedFutureTask<T> addDependency(PrioritizedFutureTask dep) {
this._dependencies.add(dep);
return this;
}
@Override
public void run() {
for (PrioritizedFutureTask dep : _dependencies) {
try {
dep.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
super.run();
}
@Override
public int compareTo(PrioritizedFutureTask<T> other) {
if (other == null) {
throw new NullPointerException();
}
return getPriority().compareTo(other.getPriority());
}
}
class CustomRunnableComparator implements Comparator<Runnable> {
@Override
public int compare(Runnable task1, Runnable task2) {
return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2);
}
}
输出:
Task E result
Task C result
Task B result
Task D result
Task A result
PS:这是Python中经过良好测试的简单拓扑排序实现,您可以轻松地将其移植到Java。
另一个答案很好,但太复杂了。
一种更简单的方法是并行执行卡恩的算法。
关键是并行执行已执行所有依赖项的所有任务。
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
class DependencyManager {
private final ConcurrentHashMap<String, List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);
private static Runnable getRunnable(DependencyManager dependencyManager, String taskId){
return () -> {
try {
Thread.sleep(2000); // A task takes 2 seconds to finish.
dependencyManager.taskCompleted(taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
_dependencies.putIfAbsent(taskId, new ArrayList<>());
_reverseDependencies.putIfAbsent(taskId, new ArrayList<>());
_tasks.putIfAbsent(taskId, getRunnable(this, taskId));
_numDependenciesExecuted.putIfAbsent(taskId, 0);
}
private void addEdge(String dependentTaskId, String dependeeTaskId) {
_dependencies.get(dependentTaskId).add(dependeeTaskId);
_reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}
public void addDependency(String dependentTaskId, String dependeeTaskId) {
addVertex(dependentTaskId);
addVertex(dependeeTaskId);
addEdge(dependentTaskId, dependeeTaskId);
}
private void taskCompleted(String taskId) {
System.out.println(String.format("%s:: Task %s done!!", Instant.now(), taskId));
_numTasksExecuted.incrementAndGet();
_reverseDependencies.get(taskId).forEach(nextTaskId -> {
_numDependenciesExecuted.computeIfPresent(nextTaskId, (__, currValue) -> currValue + 1);
int numDependencies = _dependencies.get(nextTaskId).size();
int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
if (numDependenciesExecuted == numDependencies) {
// All dependencies have been executed, so we can submit this task to the threadpool.
_executorService.submit(_tasks.get(nextTaskId));
}
});
if (_numTasksExecuted.get() == _tasks.size()) {
topoSortCompleted();
}
}
private void topoSortCompleted() {
System.out.println("Topo sort complete!!");
_executorService.shutdownNow();
}
public void executeTopoSort() {
System.out.println(String.format("%s:: Topo sort started!!", Instant.now()));
_dependencies.forEach((taskId, dependencies) -> {
if (dependencies.isEmpty()) {
_executorService.submit(_tasks.get(taskId));
}
});
}
}
public class TestParallelTopoSort {
public static void main(String[] args) {
DependencyManager dependencyManager = new DependencyManager();
dependencyManager.addDependency("8", "5");
dependencyManager.addDependency("7", "5");
dependencyManager.addDependency("7", "6");
dependencyManager.addDependency("6", "3");
dependencyManager.addDependency("6", "4");
dependencyManager.addDependency("5", "1");
dependencyManager.addDependency("5", "2");
dependencyManager.addDependency("5", "3");
dependencyManager.addDependency("4", "1");
dependencyManager.executeTopoSort();
// Parallel version takes 8 seconds to execute.
// Serial version would have taken 16 seconds.
}
}
此示例中构造的有向无环图是这样的:
问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如
我有一个任务类,它在执行前依赖于其他任务。我想将可以并行化的任务分组并排序。我决定它可以首先表示为DAG,并尝试使用JGrapht。首先,我遍历任务的输入列表,以获取所有具有依赖关系的任务,并将它们收集在一个列表中。然后,对于每个任务,我在图中创建一个顶点。 然后使用相同的列表,我试图创建节点之间的边。 然后我试着把任务分组 因此,结果是有序和分组的任务,例如图 结果将是 然而,当也是的前身时,它
考虑以下无向非循环图: 如果我们定义“根”为A和E,有没有算法可以确定产生的有向无环图?: 我考虑过从根开始尝试某种DFS或BFS,但我不确定如何处理“等待”的需要,以查看另一个根是否可能到达给定的节点。
在图论中,如果一个有向图从任意顶点出发无法经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。 因为有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。 一、简介 有向无环图是图论的重要概念,我们将首先介绍图的概念和定义,随后介绍有向图,再逐渐引至有向无环图(DAG)。值得一提的是,当DAG用于指代模型时一般指向贝叶斯网络。 一个图G
我想写一个Gradle任务,在我所有的子项目中共享。此任务在调用它的子项目中查找所有其他类型为“GenerateMavenPom”的任务,并执行这些任务。 通过这样做,我的子项目可以定义他们想要的任何Maven发布,我可以使用“gradle GenerateMavenPomFiles”等单个任务执行gradle来创建pom.xml,而不需要知道每个子项目中的单个发布类型。为什么?因为Maven插件
我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回