github地址:https://github.com/Netflix/conductor
工作流是您的流程流的容器。它可以包括几种不同类型的任务,子工作流,相互连接的输入和输出,以有效地获得所需的结果。
工作流是使用基于JSON的DSL定义的,包括作为工作流一部分执行的一组任务。这些任务可以是在远程机器上执行的控制任务(fork、conditional等)或应用程序任务(例如,对文件进行编码)。
任务是工作流的基础。工作流程中必须至少有一项任务。
任务可以分为两种类型:
任务定义有助于定义任务级别的参数,例如输入和输出,超时,重试等。
系统任务在Conductor服务器的JVM中执行,并由Conductor对其执行和可伸缩性进行管理。
Conductor提供了一个API,用于创建用户定义的任务,这些任务在与引擎相同的JVM中执行。
public class WorkflowSystemTask {
private static Map<String, WorkflowSystemTask> registry = new HashMap<>();
private String name;
public WorkflowSystemTask(String name) {
this.name = name;
registry.put(name, this);
SystemTaskWorkerCoordinator.add(this);
}
/**
* 开始执行任务
* @param 工作流程正在为其启动任务的工作流程
* @param 任务任务实例
* @param 执行器工作流执行器
*/
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
//Do nothing unless overridden by the task implementation
}
/**
*
* @param 工作流程正在为其启动任务的工作流程
* @param 任务任务实例
* @param 执行器工作流执行器
* @return true:如果执行更改了任务状态。否则返回false。
*/
public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) {
return false;
}
/**
* 取消任务执行
* @param workflow Workflow for which the task is being started
* @param task Instance of the Task
* @param executor Workflow Executor
*/
public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
}
/**
*
* @return True if the task is supposed to be started asynchronously using internal queues.
*/
public boolean isAsync() {
return false;
}
/**
* @return True 以使任务保持在“ IN_PROGRESS”状态,并在以后通过外部消息保持“ COMPLETE”状态。
* @return True to keep task in 'IN_PROGRESS' state, and 'COMPLETE' later by an external message.
*/
public boolean isAsyncComplete(Task task) {
if (task.getInputData().containsKey("asyncComplete")) {
return Optional.ofNullable(task.getInputData().get("asyncComplete"))
.map(result -> (Boolean) result)
.orElse(false);
} else {
return Optional.ofNullable(task.getWorkflowTask())
.map(WorkflowTask::isAsyncComplete)
.orElse(false);
}
}
/**
*
* @return时间(以秒为单位),如果速率受限或在启动方法执行后仍处于in_progress,则应重试此时间。
* @return Time in seconds after which the task should be retried if rate limited or remains in in_progress after start method execution.
*/
public int getRetryTimeInSecond() {
return 30;
}
/**
*
* @return name of the system task
*/
public String getName(){
return name;
}
@Override
public String toString() {
return name;
}
public static boolean is(String type){
return registry.containsKey(type);
}
public static WorkflowSystemTask get(String type) {
return registry.get(type);
}
public static Collection<WorkflowSystemTask> all() {
return registry.values();
}
}
Worker tasks由应用实现,并在与Conductor分开的环境中运行。可以使用任何语言来实现工作程序任务。这些任务通过REST/gRPC与Conductor服务器通信,以轮询任务并在执行后更新其状态。
Conductor提供以下Java客户端以与各种API进行交互
客户端 | 用法 |
---|---|
Metadata Client | 注册 / 更新工作流程 和 任务定义 |
Workflow Client | 开始新的工作流程 / 获取工作流程的执行状态 |
Task Client | 轮询任务 / 执行后更新任务结果 / 获取任务状态 |
Task Client例子:位于tags:0.01 test的com.netfix.conductor.client包下的main类
package com.netflix.conductor.client.sample;
import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import java.util.Arrays;
public class Main {
public static void main(String[] args) {
TaskClient taskClient = new TaskClient();
taskClient.setRootURI("http://localhost:8080/api/"); //Point this to the server API 将其指向服务器API
// 用于执行辅助进程的线程数。为避免挨饿,应与worker人数相同或多
int threadCount = 2; //number of threads used to execute workers. To avoid starvation, should be same or more than number of workers
Worker worker1 = new SampleWorker("task_1");
Worker worker2 = new SampleWorker("task_5");
// Create TaskRunnerConfigurer
TaskRunnerConfigurer configurer =
new TaskRunnerConfigurer
.Builder(taskClient, Arrays.asList(worker1, worker2))
.withThreadCount(threadCount).build();
// 开始轮询和执行任务
// Start the polling and execution of tasks
configurer.init();
}
}
Conductor提供了一个自动化的框架来轮询任务、管理执行线程并将执行状态更新回服务器。
实现 Worker 接口来执行任务。
Worker worker1 = new SampleWorker("task_1");
TaskRunnerConfigurer可用于注册worker(s)并初始化轮询循环。
管理task workers线程池和服务器通信(轮询和任务更新)。
使用 Builder 创建TaskRunnerConfigurer的实例。
构建器接受以下参数:
private int threadCount = -1;
// Iterable<T> 接口提供一个增强 for 循环的实现
public Builder(TaskClient taskClient, Iterable<Worker> workers) {
// Preconditions前提条件
Preconditions.checkNotNull(taskClient, "TaskClient cannot be null");
Preconditions.checkNotNull(workers, "Workers cannot be null");
this.taskClient = taskClient;
this.workers = workers;
}
------------------------------------------------------
private TaskRunnerConfigurer(Builder builder) {
this.threadCount = (builder.threadCount == -1) ? workers.size() : builder.threadCount; //默认值
范围 | 描述 | 默认值 |
---|---|---|
withEurekaClient | EurekaClient用于标识服务器是否在发现中。当服务器消失后,轮询将停止。如果传递null,则不执行发现检查。 | 由平台提供 |
withThreadCount | 分配给工作线程的线程数。应该至少是task Workers的大小,以避免在繁忙的系统中出现饥饿。 | 注册workers数 |
withSleepWhenRetry | 重试操作之前,在任务更新调用失败时线程应休眠的时间(以毫秒为单位)。 | 500 |
withUpdateRetryCount | 当更新状态调用失败时,更新任务状态时要进行的尝试次数。 | 3 |
withWorkerNamePrefix | 将用于所有workers的字符串前缀。 | workflow-worker- |
new TaskRunnerConfigurer
.Builder(taskClient, Arrays.asList(worker1, worker2))
.withThreadCount(threadCount).build();
创建实例后,调用init()方法以初始化TaskPollExecutor并开始轮询和执行任务。
/**
* 开始轮询。必须在{@link TaskRunnerConfigurer.Builder#build()}之后调用方法。
* 1. synchronized
*/
public synchronized void init() {
this.taskPollExecutor = new TaskPollExecutor(eurekaClient, taskClient, threadCount,
updateRetryCount, taskToDomain, workerNamePrefix);
this.scheduledExecutorService = Executors.newScheduledThreadPool(workers.size());
// 2. 线程池Executors.newScheduledThreadPool
// scheduleWithFixedDelay:延迟worker.getPollingInterval()毫秒后,每个任务执行完延迟worker.getPollingInterval()秒再执行1次
workers.forEach(
worker -> scheduledExecutorService.scheduleWithFixedDelay(() -> taskPollExecutor.pollAndExecute(worker),
worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS));
}
Worker类属性
属性 | 类型 | 描述 | 默认值 |
---|---|---|---|
paused | boolean | 如果设置为true,则工作程序停止轮询。 | false |
pollInterval | int | 轮询服务器执行任务的时间间隔(以毫秒为单位)。 | 1000 |
/**
* Override this method to pause the worker from polling.
* 重写此方法以暂停工作进程的轮询。
*
* @return true if the worker is paused and no more tasks should be polled from server.
*/
default boolean paused() {
return PropertyFactory.getBoolean(getTaskDefName(), "paused", false);
}
/**
* Override this method to change the interval between polls.
* 重写此方法以更改轮询之间的间隔。
*
* @return interval in millisecond at which the server should be polled for worker tasks.
*/
default int getPollingInterval() {
return PropertyFactory.getInteger(getTaskDefName(), "pollInterval", 1000);
}
为确保TaskRunnerConfigurer在实例变得不正常时应该停止轮询任务,在应用程序中的PreDestroy块中调用shutdown()。
public void shutdown() {
taskPollExecutor.shutdownExecutorService(scheduledExecutorService);
}
------------------------------------------------------------
void shutdownExecutorService(ExecutorService executorService) {
int timeout = 10;
try {
if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
LOGGER.debug("tasks completed, shutting down");
} else {
LOGGER.warn(String.format("forcing shutdown after waiting for %s second", timeout));
executorService.shutdownNow();
}
} catch (InterruptedException ie) {
LOGGER.warn("shutdown interrupted, invoking shutdownNow");
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
关闭线程池awaitTermination和shutdownNow的区别:https://blog.csdn.net/liwenxia626/article/details/80754886