当前位置: 首页 > 工具软件 > PowerJob > 使用案例 >

分布式任务调度与计算框架:PowerJob 高级特性-OpenAPI 04

胡飞舟
2023-12-01

OpenAPI

**OpenAPI 允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于 API 便捷地扩展PowerJob 原有的功能,比如,**全面定制自己的任务调度策略。

换句话说,通过 OpenAPI,可以让接入方自己实现 PowerJob 的整个任务管理与调度模块。

依赖

最新依赖版本请参考 Maven 中央仓库:推荐地址

<dependency>
  <groupId>tech.powerjob</groupId>
  <artifactId>powerjob-client</artifactId>
  <version>${latest.powerjob.version}</version>
</dependency>

简单示例

通过 OpenAPI 停止某个任务实例。

// 初始化 client,需要server地址和应用名称作为参数
PowerJobClient client = new PowerJobClient("127.0.0.1:7700", "oms-test", "password");
// 调用相关的API
client.stopInstance(1586855173043L)

API列表

任务(Job)相关

创建/修改任务

修改(更新)时也需要填写全部参数,不能只填写修改字段!

接口签名:ResultDTO<Long> saveJob(SaveJobInfoRequest request)

入参:任务信息(详细说明见下表)

返回值:ResultDTO,根据 success 判断操作是否成功。若操作成功,data 字段返回任务ID

属性说明
jobId任务 ID,可选,null 代表创建任务,否则填写需要修改的任务 ID
jobName任务名称
jobDescription任务描述
jobParams任务参数,Processor#process 方法入参 TaskContext 对象的 jobParams 字段
timeExpressionType时间表达式类型,枚举值
timeExpression时间表达式,填写类型由 timeExpressionType 决定,比如 CRON 需要填写 CRON 表达式
executeType执行类型,枚举值
processorType处理器类型,枚举值
processorInfo处理器参数,填写类型由 processorType 决定,如Java 处理器需要填写全限定类名,如:com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo
maxInstanceNum最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例)
concurrency单机线程并发数,表示该实例执行过程中每个Worker 使用的线程数量
instanceTimeLimit任务实例运行时间限制,0 代表无任何限制,超时会被打断并判定为执行失败
instanceRetryNum任务实例重试次数,整个任务失败时重试,代价大,不推荐使用
taskRetryNumTask 重试次数,每个子 Task 失败后单独重试,代价小,推荐使用
minCpuCores最小可用 CPU 核心数,CPU 可用核心数小于该值的 Worker 将不会执行该任务,0 代表无任何限制
minMemorySpace最小内存大小(GB),可用内存小于该值的Worker 将不会执行该任务,0 代表无任何限制
minDiskSpace最小磁盘大小(GB),可用磁盘空间小于该值的Worker 将不会执行该任务,0 代表无任何限制
designatedWorkers指定机器执行,设置该参数后只有列表中的机器允许执行该任务,空代表不指定机器
maxWorkerCount最大执行机器数量,限定调动执行的机器数量,0代表无限制
notifyUserIds接收报警的用户 ID 列表
enable是否启用该任务,未启用的任务不会被调度
dispatchStrategy调度策略,枚举,目前支持随机(RAMDOM)和 健康度优先(HEALTH_FIRST)
lifecycle生命周期(预留,用于指定定时调度任务的生效时间范围)
extra扩展字段(供开发者使用,用于功能扩展,powerjob 自身不会使用该字段)

查找任务

接口签名:ResultDTO<JobInfoDTO> fetchJob(Long jobId)

入参:任务ID

返回值:根据 success 判断操作是否成功,若请求成功则返回任务的详细信息

禁用某个任务

接口签名:ResultDTO<Void> disableJob(Long jobId)

入参:任务 ID

返回值:根据 success 判断操作是否成功

启用某个任务

接口签名:ResultDTO<Void> enableJob(Long jobId)

入参:任务 ID

返回值:根据 success 判断操作是否成功

删除某个任务

接口签名:ResultDTO<Void> deleteJob(Long jobId)

入参:任务 ID

返回值:根据 success 判断操作是否成功

运行某个任务(支持延迟执行)

接口签名:ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS)

入参:任务 ID + 任务实例参数(Processor#process 方法入参 TaskContext 对象的 instanceParams 字段)+ 延迟执行时间

返回值:根据 success 判断操作是否成功,操作成功返回对应的任务实例 ID (instanceId)

任务实例(Instance)相关

取消某个定时任务实例

接口签名:ResultDTO<Void> cancelInstance(Long instanceId)

入参:任务实例 ID

返回值:根据 success 判断操作是否成功

停止某个任务实例

接口签名:ResultDTO<Void> stopInstance(Long instanceId)

入参:任务实例 ID

返回值:根据 success 判断操作是否成功

查询某个任务实例

接口签名:ResultDTO<InstanceInfoDTO> fetchInstanceInfo(Long instanceId)

入参:任务实例 ID

返回值:根据 success 判断操作是否成功,操作成功返回任务实例的详细信息

查询某个任务实例的状态

接口签名:ResultDTO<Integer> fetchInstanceStatus(Long instanceId)

入参:任务实例 ID

返回值:根据 success 判断操作是否成功,操作成功返回任务实例的状态码,对应的枚举为:InstanceStatus

工作流(Workflow)相关

创建/修改工作流

修改(更新)时也需要填写全部参数,不能只填写修改字段!

接口签名:ResultDTO<Long> saveWorkflow(SaveWorkflowRequest workflowInfo)

入参:工作流信息

返回值:ResultDTO,根据 success 判断操作是否成功。若操作成功,data 字段返回工作流 ID

属性说明
id工作流 ID,可选,仅更新时需要( id 为 null 代表新增数据,id 不为 null 代表更新)
wfName工作流名称
wfDescription工作流描述
dag工作流中的任务依赖配置,点+线构成的 DAG 描述
timeExpressionType时间表达式类型,枚举值,仅支持 CRON 和 API
timeExpression时间表达式,填写类型由 timeExpressionType 决定,比如 CRON 需要填写 CRON 表达式
maxWfInstanceNum最大工作流实例数
notifyUserIds接收报警的用户 ID 列表
enable是否启用该工作流,未启用的工作流不会被调度

查找工作流

接口签名:ResultDTO<JobInfoDTO> fetchWorkflow(Long workflowId)

入参:工作流 ID

返回值:根据 success 判断操作是否成功,若请求成功则返回工作流的详细信息

禁用某个工作流

接口签名:ResultDTO<Void> disableWorkflow(Long workflowId)

入参:工作流 ID

返回值:根据 success 判断操作是否成功

启用某个工作流

接口签名:ResultDTO<Void> enableWorkflow(Long workflowId)

入参:工作流 ID

返回值:根据 success 判断操作是否成功

删除某个工作流

接口签名:ResultDTO<Void> deleteWorkflow(Long workflowId)

入参:工作流 ID

返回值:根据 success 判断操作是否成功

立即运行某个工作流

接口签名:ResultDTO<Long> runWorkflow(Long workflowId)

入参:工作流 ID

返回值:根据 success 判断操作是否成功,操作成功返回对应的工作流实例 ID (wfInstanceId)

运行某个工作流(指定初始参数、延迟)

接口签名:ResultDTO<Long> runWorkflow(Long workflowId, String initParams, long delayMS)

入参:工作流 ID(workflowId),初始参数(initParams),延迟(delayMS)

返回值:根据 success 判断操作是否成功,操作成功返回对应的工作流实例 ID (wfInstanceId)

工作流实例(WorkflowInstance)相关

停止某个工作流实例

接口签名:ResultDTO<Void> stopWorkflowInstance(Long wfInstanceId)

入参:工作流实例 ID (wfInstanceId)

返回值:根据 success 判断操作是否成功

查询某个工作流任务实例

接口签名:ResultDTO<WorkflowInstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId)

入参:工作流实例 ID (wfInstanceId)

返回值:根据 success 判断操作是否成功,操作成功返回工作流实例的详细信息

 类似资料: