Netflix/conductor学习笔记(1)

谭景福
2023-12-01

Netflix/conductor入门

github地址:https://github.com/Netflix/conductor

一、基本概念

Workflow

工作流是您的流程流的容器。它可以包括几种不同类型的任务,子工作流,相互连接的输入和输出,以有效地获得所需的结果。

Workflow 定义

工作流是使用基于JSON的DSL定义的,包括作为工作流一部分执行的一组任务。这些任务可以是在远程机器上执行的控制任务(fork、conditional等)或应用程序任务(例如,对文件进行编码)。

Tasks

任务是工作流的基础。工作流程中必须至少有一项任务。
任务可以分为两种类型:

  • Systems tasks - 由Conductor服务器执行。
  • Worker tasks - 由您自己的workers执行。

Task定义

任务定义有助于定义任务级别的参数,例如输入和输出,超时,重试等。

  • 所有任务都必须先注册,然后才能由活动工作流使用。
  • 一个任务可以在多个工作流中重复使用。

System Tasks

系统任务在Conductor服务器的JVM中执行,并由Conductor对其执行和可伸缩性进行管理。

Conductor提供了一个API,用于创建用户定义的任务,这些任务在与引擎相同的JVM中执行。

public class WorkflowSystemTask {

	private static Map<String, WorkflowSystemTask> registry = new HashMap<>();
	
	private String name;
	
	public WorkflowSystemTask(String name) {
		this.name = name;
		registry.put(name, this);
		SystemTaskWorkerCoordinator.add(this);
	}

   /**
	 * 开始执行任务
	 * @param 工作流程正在为其启动任务的工作流程
	 * @param 任务任务实例
	 * @param 执行器工作流执行器
	 */
	public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
		//Do nothing unless overridden by the task implementation
	}
	
	/**
	 * 
     * @param 工作流程正在为其启动任务的工作流程
	 * @param 任务任务实例
	 * @param 执行器工作流执行器
	 * @return true:如果执行更改了任务状态。否则返回false。
	 */
	public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) {
		return false;
	}
	
	/**
	 * 取消任务执行
	 * @param workflow Workflow for which the task is being started
	 * @param task Instance of the Task
	 * @param executor Workflow Executor
	 */
	public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
	}
	
	/**
	 * 
	 * @return True if the task is supposed to be started asynchronously using internal queues.
	 */
	public boolean isAsync() {
		return false;
	}

	/**
	 * @return True 以使任务保持在“ IN_PROGRESS”状态,并在以后通过外部消息保持“ COMPLETE”状态。
	 * @return True to keep task in 'IN_PROGRESS' state, and 'COMPLETE' later by an external message.
	 */
	public boolean isAsyncComplete(Task task) {
		if (task.getInputData().containsKey("asyncComplete")) {
			return Optional.ofNullable(task.getInputData().get("asyncComplete"))
				.map(result -> (Boolean) result)
				.orElse(false);
		} else {
			return Optional.ofNullable(task.getWorkflowTask())
				.map(WorkflowTask::isAsyncComplete)
				.orElse(false);
		}
	}
	
	/**
	 * 
     * @return时间(以秒为单位),如果速率受限或在启动方法执行后仍处于in_progress,则应重试此时间。
	 * @return Time in seconds after which the task should be retried if rate limited or remains in in_progress after start method execution. 
	 */
	public int getRetryTimeInSecond() {
		return 30;
	}
	/**
	 * 
	 * @return name of the system task
	 */
	public String getName(){
		return name;
	}
	
	@Override
	public String toString() {
		return name;
	}
	
	public static boolean is(String type){
		return registry.containsKey(type);
	}
	
	public static WorkflowSystemTask get(String type) {
		return registry.get(type);
	}
	
	public static Collection<WorkflowSystemTask> all() {
		return registry.values();
	}
	
}

Worker Tasks

Worker tasks由应用实现,并在与Conductor分开的环境中运行。可以使用任何语言来实现工作程序任务。这些任务通过REST/gRPC与Conductor服务器通信,以轮询任务并在执行后更新其状态。


二、使用客户端

客户端API

Conductor提供以下Java客户端以与各种API进行交互

客户端用法
Metadata Client注册 / 更新工作流程 和 任务定义
Workflow Client开始新的工作流程 / 获取工作流程的执行状态
Task Client轮询任务 / 执行后更新任务结果 / 获取任务状态

Task Client例子:位于tags:0.01 test的com.netfix.conductor.client包下的main类

package com.netflix.conductor.client.sample;

import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;

import java.util.Arrays;

public class Main {

    public static void main(String[] args) {

        TaskClient taskClient = new TaskClient();
        taskClient.setRootURI("http://localhost:8080/api/");        //Point this to the server API 将其指向服务器API

        // 用于执行辅助进程的线程数。为避免挨饿,应与worker人数相同或多
        int threadCount = 2;            //number of threads used to execute workers.  To avoid starvation, should be same or more than number of workers

        Worker worker1 = new SampleWorker("task_1");
        Worker worker2 = new SampleWorker("task_5");

        // Create TaskRunnerConfigurer
        TaskRunnerConfigurer configurer =
                new TaskRunnerConfigurer
                        .Builder(taskClient, Arrays.asList(worker1, worker2))
                        .withThreadCount(threadCount).build();

        // 开始轮询和执行任务
        // Start the polling and execution of tasks
        configurer.init();
    }
}

Worker

Conductor提供了一个自动化的框架来轮询任务、管理执行线程并将执行状态更新回服务器。
实现 Worker 接口来执行任务。

Worker worker1 = new SampleWorker("task_1");

TaskRunnerConfigurer

TaskRunnerConfigurer可用于注册worker(s)并初始化轮询循环。
管理task workers线程池和服务器通信(轮询和任务更新)。

使用 Builder 创建TaskRunnerConfigurer的实例。
构建器接受以下参数:

  • TaskClient:TaskClient被用于与Conductor服务器通信
  • Workers:Workers将用于轮询工作和任务执行的工作程序
		private int threadCount = -1;
		// Iterable<T> 接口提供一个增强 for 循环的实现
        public Builder(TaskClient taskClient, Iterable<Worker> workers) {
            // Preconditions前提条件
            Preconditions.checkNotNull(taskClient, "TaskClient cannot be null");
            Preconditions.checkNotNull(workers, "Workers cannot be null");
            this.taskClient = taskClient;
            this.workers = workers;
        }
		------------------------------------------------------
		private TaskRunnerConfigurer(Builder builder) {
			this.threadCount = (builder.threadCount == -1) ? workers.size() : builder.threadCount; //默认值
范围描述默认值
withEurekaClientEurekaClient用于标识服务器是否在发现中。当服务器消失后,轮询将停止。如果传递null,则不执行发现检查。由平台提供
withThreadCount分配给工作线程的线程数。应该至少是task Workers的大小,以避免在繁忙的系统中出现饥饿。注册workers数
withSleepWhenRetry重试操作之前,在任务更新调用失败时线程应休眠的时间(以毫秒为单位)。500
withUpdateRetryCount当更新状态调用失败时,更新任务状态时要进行的尝试次数。3
withWorkerNamePrefix将用于所有workers的字符串前缀。workflow-worker-
 new TaskRunnerConfigurer
                        .Builder(taskClient, Arrays.asList(worker1, worker2))
                        .withThreadCount(threadCount).build();

创建实例后,调用init()方法以初始化TaskPollExecutor并开始轮询和执行任务。

	/**
     * 开始轮询。必须在{@link TaskRunnerConfigurer.Builder#build()}之后调用方法。
     * 1. synchronized
     */
    public synchronized void init() {
        this.taskPollExecutor = new TaskPollExecutor(eurekaClient, taskClient, threadCount,
            updateRetryCount, taskToDomain, workerNamePrefix);

        this.scheduledExecutorService = Executors.newScheduledThreadPool(workers.size());
        // 2. 线程池Executors.newScheduledThreadPool
        // scheduleWithFixedDelay:延迟worker.getPollingInterval()毫秒后,每个任务执行完延迟worker.getPollingInterval()秒再执行1次
        workers.forEach(
            worker -> scheduledExecutorService.scheduleWithFixedDelay(() -> taskPollExecutor.pollAndExecute(worker),
                worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS));
    }

Worker类属性

属性类型描述默认值
pausedboolean如果设置为true,则工作程序停止轮询。false
pollIntervalint轮询服务器执行任务的时间间隔(以毫秒为单位)。1000
	/**
     * Override this method to pause the worker from polling.
     * 重写此方法以暂停工作进程的轮询。
     *
     * @return true if the worker is paused and no more tasks should be polled from server.
     */
    default boolean paused() {
        return PropertyFactory.getBoolean(getTaskDefName(), "paused", false);
    }
    
	/**
     * Override this method to change the interval between polls.
     * 重写此方法以更改轮询之间的间隔。
     *
     * @return interval in millisecond at which the server should be polled for worker tasks.
     */
    default int getPollingInterval() {
        return PropertyFactory.getInteger(getTaskDefName(), "pollInterval", 1000);
    }

为确保TaskRunnerConfigurer在实例变得不正常时应该停止轮询任务,在应用程序中的PreDestroy块中调用shutdown()。

public void shutdown() {
        taskPollExecutor.shutdownExecutorService(scheduledExecutorService);
    }

------------------------------------------------------------
void shutdownExecutorService(ExecutorService executorService) {
        int timeout = 10;
        try {
            if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn(String.format("forcing shutdown after waiting for %s second", timeout));
                executorService.shutdownNow();
            }
        } catch (InterruptedException ie) {
            LOGGER.warn("shutdown interrupted, invoking shutdownNow");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

关闭线程池awaitTermination和shutdownNow的区别:https://blog.csdn.net/liwenxia626/article/details/80754886

 类似资料: