关于hadoop的集成,请参考另外一篇文章,这里就过多的赘述:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- oozie -->
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-client</artifactId>
<version>4.3.0</version>
</dependency>
hdfs:
hdfsPath: hdfs://bigdata-master:8020
hdfsName: bigdata-master
oozie:
url: http://bigdata-master:11000/oozie
wf:
application:
path: hdfs://bigdata-master:9000/user/oozie/workflow/hiveserver2.xml
use:
system:
libpath: true
libpath: hdfs://bigdata-master:8020/user/oozie/share/lib
callback:
url: http://172.16.120.29:8080/label/oozie/callback?executeType=$1\&taskType=$2\&callbackId=$3
jdbc:
url: jdbc:hive2://192.168.150.119:10000/default
password:
nameNode: hdfs://bigdata-master:8020
resourceManager: hdfs://bigdata-master:8088
queueName: default
job-tracker: bigdata-master:8032
package com.winterchen.hadoopdemo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/19 7:21 下午
* @description
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
public class OozieConfig {
@Value("${oozie.nameNode}")
private String nameNode;
@Value("${oozie.job-tracker}")
private String jobTracker;
@Value("${oozie.resourceManager}")
private String resourceManager;
@Value("${oozie.queueName}")
private String queueName;
@Value("${oozie.url}")
private String url;
@Value("${oozie.wf.application.path}")
private String oozieApplicationPath;
@Value("${oozie.libpath}")
private String oozieLibPath;
@Value("${oozie.use.system.libpath}")
private boolean oozieSystemLibPath;
@Value("${oozie.jdbc.url}")
private String jdbcUrl;
@Value("${oozie.jdbc.password}")
private String password;
@Value("${oozie.callback.url}")
private String callbackUrl;
}
/**
* @author winterchen
* @version 1.0
* @date 2020/11/23 2:23 下午
* @description
**/
public class OozieConstants {
public static final String NAME_NODE= "nameNode";
public static final String RESOURCE_MANAGER = "resourcemanager";
public static final String QUEUE_NAME = "queueName";
public static final String ROOT_DIR = "rootdir";
public static final String JOB_TRACKER = "jobTracker";
public static final String JOB_OUTPUT = "jobOutput";
public static final String JDBC_URL = "jdbcUrl";
public static final String PASSWORD = "password";
public static final String SQL_INPUT = "sqlInput";
public static final String USER_NAME = "user.name";
public static final String TASK_TYPE = "taskType";
public static final String SHELL_FILE_NAME = "shellFileName";
public static final String SHELL_FILE_PATH = "shellFilePath";
public static final String CALLBACK_ID = "callbackId";
public static final String WORKFLOW_ROOT = "workflowRoot";
public static final String START = "start";
public static final String END = "end";
}
package com.winterchen.hadoopdemo.model;
import com.winterchen.hadoopdemo.enums.FrequencyTypeEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.*;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/25 6:01 下午
* @description 定时调度任务请求
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
@Builder
@ApiModel
public class CoordinatorRequest {
@ApiModelProperty("定时调度任务名称")
private String coordName;
@ApiModelProperty("定时调度任务文件路径")
private String coordPath;
@ApiModelProperty("频率")
private FrequencyTypeEnum frequencyType;
@ApiModelProperty("开始时间")
private String startTime;
@ApiModelProperty("结束时间")
private String endTime;
@ApiModelProperty("workflow名称")
private String wfName;
@ApiModelProperty("workflow路径")
private String wfPath;
@ApiModelProperty("回调编号")
private String callbackId;
}
package com.winterchen.hadoopdemo.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.*;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/25 5:33 下午
* @description workflow任务请求
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
@Builder
@ApiModel
public class WorkflowRequest {
@ApiModelProperty("workflow名称")
private String wfName;
@ApiModelProperty("workflow路径")
private String wfPath;
@ApiModelProperty("执行的sql")
private String sql;
@ApiModelProperty("回调编号")
private String callbackId;
}
package com.winterchen.hadoopdemo.service;
import com.winterchen.hadoopdemo.enums.FrequencyTypeEnum;
import com.winterchen.hadoopdemo.model.CoordinatorRequest;
import com.winterchen.hadoopdemo.model.WorkflowRequest;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/23 2:06 下午
* @description
**/
public interface OozieService {
/**
* @Author winterchen
* @Description 提交workflow任务
* @Date 6:21 下午 2020/11/25
* @Param [workflowRequest]
* @return java.lang.String
**/
String submitWorkflow(WorkflowRequest workflowRequest);
/**
* @Author winterchen
* @Description 提交coordinator任务
* @Date 6:21 下午 2020/11/25
* @Param [coordinatorRequest]
* @return java.lang.String
**/
String submitCoordinator(CoordinatorRequest coordinatorRequest);
/**
* @Author winterchen
* @Description 创建并上传sql文件至hdfs
* @Date 6:21 下午 2020/11/25
* @Param [sql, sqlPath]
* @return java.lang.String 文件地址
**/
String createSqlFileAndUpload(String sql, String sqlPath);
/**
* @Author winterchen
* @Description 创建并上传workflow任务脚本文件至hdfs
* @Date 6:22 下午 2020/11/25
* @Param [wfName, wfPath, sqlPath, callbackId]
* @return String 文件地址
**/
String createWfFileAndUpload(String wfName, String wfPath, String sqlPath, String callbackId);
/**
* @Author winterchen
* @Description 创建并上传coordinator定时任务脚本文件至hdfs
* @Date 6:23 下午 2020/11/25
* @Param [coordName, coordPath, wfPath, frequencyType, callbackId]
* @return String 文件地址
**/
String createCoordFileAndUpload(String coordName, String coordPath, String wfPath, FrequencyTypeEnum frequencyType, String callbackId);
/**
* @Author winterchen
* @Description 创建shell脚本并上传
* @Date 6:41 下午 2020/11/25
* @Param [shellFileName, shellFilePath]
* @return String 文件地址
**/
String createShellFileAndUpload(String shellFileName, String shellFilePath);
/**
* @Author winterchen
* @Description 处理回调
* @Date 6:24 下午 2020/11/25
* @Param [targetType, targetId]
* @return void
**/
void executeCallback(String executeType, String taskType, String callbackId);
/**
* @Author winterchen
* @Description 停止定时调度任务
* @Date 6:24 下午 2020/11/25
* @Param [jobId]
* @return void
**/
void killCoordinatorJob(String jobId);
}
package com.winterchen.hadoopdemo.service.impl;
import cn.hutool.core.date.DateUtil;
import com.winterchen.hadoopdemo.constants.OozieConstants;
import com.winterchen.hadoopdemo.enums.FrequencyTypeEnum;
import com.winterchen.hadoopdemo.enums.TaskTypeEnum;
import com.winterchen.hadoopdemo.model.CoordinatorRequest;
import com.winterchen.hadoopdemo.model.OozieConfig;
import com.winterchen.hadoopdemo.model.WorkflowRequest;
import com.winterchen.hadoopdemo.service.OozieService;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Properties;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/23 2:06 下午
* @description
**/
@Slf4j
@Service
public class OozieServiceImpl implements OozieService {
@Autowired
private FileSystem fileSystem;
private final OozieConfig oozieConfig;
@Autowired
public OozieServiceImpl(OozieConfig oozieConfig) {
this.oozieConfig = oozieConfig;
}
@Override
public String submitWorkflow(WorkflowRequest workflowRequest) {
try {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
Path appPath = new Path(fileSystem.getHomeDirectory(), workflowRequest.getWfPath().concat(workflowRequest.getWfName()).concat(".xml"));
// 创建相关文件
// 创建并上传sql文件
String sqlPath = workflowRequest.getWfPath().concat("sql/".concat(workflowRequest.getWfName()).concat("-sql.q"));
createSqlFileAndUpload(workflowRequest.getSql(), sqlPath);
// 创建shell脚本
String shellFileName = workflowRequest.getWfName() + "-shell.sh";
String shellFilePath = workflowRequest.getWfPath().concat(workflowRequest.getWfName()).concat("/shell/");
String shellPath = createShellFileAndUpload(shellFileName, shellFilePath);
// 创建并上传wf脚本文件
createWfFileAndUpload(workflowRequest.getWfName(), workflowRequest.getWfPath(), sqlPath, workflowRequest.getCallbackId());
// 创建脚本任务的配置
Properties prop = oozieClient.createConfiguration();
prop.setProperty(OozieClient.APP_PATH, appPath.toString());
prop.setProperty(oozieClient.LIBPATH, oozieConfig.getOozieLibPath());
prop.setProperty(oozieClient.USE_SYSTEM_LIBPATH, String.valueOf(oozieConfig.isOozieSystemLibPath()));
/*Set Your Application Configuration*/
prop.setProperty(OozieConstants.NAME_NODE, oozieConfig.getNameNode());
prop.setProperty(OozieConstants.JOB_TRACKER,oozieConfig.getJobTracker());
Path outputPath = new Path(fileSystem.getHomeDirectory(), workflowRequest.getWfPath().concat("output/"));
prop.setProperty(OozieConstants.JOB_OUTPUT, outputPath.toString());
prop.setProperty(OozieConstants.JDBC_URL, oozieConfig.getJdbcUrl());
prop.setProperty(OozieConstants.PASSWORD, StringUtils.isEmpty(oozieConfig.getPassword()) ? "" : oozieConfig.getPassword());
prop.setProperty(OozieConstants.SQL_INPUT,workflowRequest.getWfPath().concat("sql/"));
prop.setProperty(OozieConstants.USER_NAME,"admin");
prop.setProperty(OozieConstants.TASK_TYPE, TaskTypeEnum.WORKFLOW.name());
prop.setProperty(OozieConstants.SHELL_FILE_NAME,shellFileName);
prop.setProperty(OozieConstants.SHELL_FILE_PATH, shellPath);
prop.setProperty(OozieConstants.CALLBACK_ID, workflowRequest.getCallbackId());
prop.setProperty(OozieConstants.QUEUE_NAME, oozieConfig.getQueueName());
String jobId = oozieClient.submit(prop);
oozieClient.start(jobId);
log.debug("workflow job submitted, jobId = {}", jobId);
return jobId;
} catch (OozieClientException e) {
log.error("workflow任务提交失败" ,e);
}
return null;
}
@Override
public String submitCoordinator(CoordinatorRequest coordinatorRequest) {
try {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
Path rootPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getCoordPath());
Path appPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getCoordPath()
.concat(coordinatorRequest.getCoordName()).concat(".xml"));
Path wf = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getWfPath());
// 创建相关文件
// 创建并上传定时调度任务脚本
createCoordFileAndUpload(coordinatorRequest.getCoordName(),coordinatorRequest.getCoordPath(),
wf.toString().concat("/").concat(coordinatorRequest.getWfName()).concat(".xml"),coordinatorRequest.getFrequencyType(), coordinatorRequest.getCallbackId());
// 创建shell脚本
String shellFileName = coordinatorRequest.getWfName() + "-shell.sh";
String shellFilePath = coordinatorRequest.getWfPath().concat(coordinatorRequest.getWfName()).concat("/shell/");
String shellPath = createShellFileAndUpload(shellFileName, shellFilePath);
// 创建脚本任务的配置
Properties prop = oozieClient.createConfiguration();
prop.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath.toString());
prop.setProperty(oozieClient.LIBPATH, oozieConfig.getOozieLibPath());
prop.setProperty(oozieClient.USE_SYSTEM_LIBPATH, String.valueOf(oozieConfig.isOozieSystemLibPath()));
prop.setProperty(OozieConstants.JOB_TRACKER,oozieConfig.getJobTracker());
prop.setProperty(OozieConstants.USER_NAME,"admin");
prop.setProperty(OozieConstants.WORKFLOW_ROOT, rootPath.toString());
String start = DateUtil.format(DateUtil.parse(coordinatorRequest.getStartTime(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd'T'HH:mm'Z'");
prop.setProperty(OozieConstants.START, start);
String end = DateUtil.format(DateUtil.parse(coordinatorRequest.getEndTime(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd'T'HH:mm'Z'");
prop.setProperty(OozieConstants.END, end);
Path outputPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getWfPath().concat("output/"));
prop.setProperty(OozieConstants.JOB_OUTPUT, outputPath.toString());
prop.setProperty(OozieConstants.JDBC_URL, oozieConfig.getJdbcUrl());
prop.setProperty(OozieConstants.PASSWORD, StringUtils.isEmpty(oozieConfig.getPassword()) ? "" : oozieConfig.getPassword());
prop.setProperty(OozieConstants.SQL_INPUT,coordinatorRequest.getWfPath().concat("sql/"));
prop.setProperty(OozieConstants.TASK_TYPE, TaskTypeEnum.COORDINATOR.name());
prop.setProperty(OozieConstants.SHELL_FILE_NAME,shellFileName);
prop.setProperty(OozieConstants.SHELL_FILE_PATH, shellPath);
prop.setProperty(OozieConstants.CALLBACK_ID, coordinatorRequest.getCallbackId());
prop.setProperty(OozieConstants.QUEUE_NAME, oozieConfig.getQueueName());
/*Set Your Application Configuration*/
prop.setProperty(OozieConstants.NAME_NODE, oozieConfig.getNameNode());
String jobId = oozieClient.submit(prop);
log.debug("workflow job submitted, jobId = {}", jobId);
return jobId;
} catch (OozieClientException e) {
log.error("workflow任务提交失败" ,e);
}
return null;
}
@Override
public String createSqlFileAndUpload(String sql, String sqlPath) {
Writer writer = null;
try {
Path sqlP = new Path(fileSystem.getHomeDirectory(),sqlPath);
writer = new OutputStreamWriter(fileSystem.create(sqlP));
writer.write(sql);
return sqlP.toString();
} catch (IOException e) {
log.error("创建sql文件失败", e);
} finally {
if (null != writer) {
try {
writer.close();
} catch (IOException e) {
log.error("关闭流失败", e);
}
}
}
return null;
}
@Override
public String createWfFileAndUpload(String wfName, String wfPath, String sqlFileName, String callbackId) {
Writer writer = null;
try {
Path wf = new Path(fileSystem.getHomeDirectory(),wfPath.concat(wfName).concat(".xml"));
writer = new OutputStreamWriter(fileSystem.create(wf));
String wfApp =
"<workflow-app xmlns='uri:oozie:workflow:0.4' name='" + wfName + "'>\n" +
" <start to='my-hive2-action'/>\n" +
" <action name='my-hive2-action'>\n" +
" <hive2 xmlns='uri:oozie:hive2-action:0.1'>\n" +
" <name-node>${nameNode}</name-node>\n" +
" <prepare>\n" +
" <delete path='${jobOutput}'/>\n" +
" </prepare>\n" +
" <configuration>\n" +
" <property>\n" +
" <name>mapred.compress.map.output</name>\n" +
" <value>true</value>\n" +
" </property>\n" +
" </configuration>\n" +
" <jdbc-url>${jdbcUrl}</jdbc-url>\n" +
// " <password>${password}</password>\n" +
" <script>" + sqlFileName + "</script>\n" +
" <param>InputDir=${sqlInput}</param>\n" +
" <param>OutputDir=${jobOutput}</param>\n" +
" </hive2>\n" +
" <ok to='success-action'/>\n" +
" <error to='error-action'/>\n" +
" </action>\n" +
" <!-- 成功回调 -->\n" +
" <action name='success-action'>\n" +
" <shell xmlns=\"uri:oozie:shell-action:0.2\">\n" +
" <job-tracker>${jobTracker}</job-tracker>\n" +
" <name-node>${nameNode}</name-node>\n" +
" <configuration>\n" +
" <property>\n" +
" <name>mapred.job.queue.name</name>\n" +
" <value>${queueName}</value>\n" +
" </property>\n" +
" </configuration>\n" +
" <exec>${shellFileName}</exec>\n" +
" <argument>${taskType}</argument>\n" +
" <argument>OK</argument>\n" +
" <argument>${callbackId}</argument>\n" +
" <file>${shellFilePath}#${shellFilePath}</file> <!--Copy the executable to compute node's current working directory -->\n" +
" </shell>\n" +
" <ok to='end' />\n" +
" <error to='fail' />\n" +
" </action>\n" +
" \n" +
" <!-- 失败回调 -->\n" +
" <action name='error-action'>\n" +
" <shell xmlns=\"uri:oozie:shell-action:0.2\">\n" +
" <job-tracker>${jobTracker}</job-tracker>\n" +
" <name-node>${nameNode}</name-node>\n" +
" <configuration>\n" +
" <property>\n" +
" <name>mapred.job.queue.name</name>\n" +
" <value>${queueName}</value>\n" +
" </property>\n" +
" </configuration>\n" +
" <exec>${shellFileName}</exec>\n" +
" <argument>${taskType}</argument>\n" +
" <argument>FAIL</argument>\n" +
" <argument>${callbackId}</argument>\n" +
" <file>${shellFilePath}#${shellFilePath}</file> <!--Copy the executable to compute node's current working directory -->\n" +
" </shell>\n" +
" <ok to='end' />\n" +
" <error to='fail' />\n" +
" </action>\n" +
" <kill name='fail'>\n" +
" <message>执行脚本失败</message>\n" +
" </kill>\n" +
" <end name='end'/>\n" +
"</workflow-app>";
writer.write(wfApp);
return wf.toString();
} catch (IOException e) {
log.error("创建workflow文件失败", e);
} finally {
if (null != writer) {
try {
writer.close();
} catch (IOException e) {
log.error("关闭流失败", e);
}
}
}
return null;
}
@Override
public String createCoordFileAndUpload(String coordName, String coordPath, String wfPath, FrequencyTypeEnum frequencyType, String callbackId) {
Writer writer = null;
try {
Path coord = new Path(fileSystem.getHomeDirectory(),coordPath.concat(coordName).concat(".xml"));
writer = new OutputStreamWriter(fileSystem.create(coord));
String frequency = FrequencyTypeEnum.getExpressionByName(frequencyType.name(), 1);
String wfApp =
"<coordinator-app name='" + coordName + "' frequency='" + frequency + "' start='${start}' end='${end}' timezone='Asia/Shanghai'\n" +
" xmlns='uri:oozie:coordinator:0.4'>\n" +
" <action>\n" +
" <workflow>\n" +
" <app-path>" + wfPath + "</app-path>\n" +
" </workflow>\n" +
" </action>\n" +
"</coordinator-app>";
writer.write(wfApp);
return coordName.toString();
} catch (IOException e) {
log.error("创建coordinator文件失败", e);
} finally {
if (null != writer) {
try {
writer.close();
} catch (IOException e) {
log.error("关闭流失败", e);
}
}
}
return null;
}
@Override
public String createShellFileAndUpload(String shellFileName, String shellFilePath) {
Writer writer = null;
try {
Path shellPath = new Path(fileSystem.getHomeDirectory(),shellFilePath.concat(shellFileName));
writer = new OutputStreamWriter(fileSystem.create(shellPath));
String shell =
"#!/bin/bash\n" +
"echo 'curl " + oozieConfig.getCallbackUrl() + "';\n" +
"curl -X GET " + oozieConfig.getCallbackUrl();
writer.write(shell);
return shellPath.toString();
} catch (IOException e) {
log.error("创建shell文件失败", e);
} finally {
if (null != writer) {
try {
writer.close();
} catch (IOException e) {
log.error("关闭流失败", e);
}
}
}
return null;
}
@Override
public void executeCallback(String executeType, String taskType, String callbackId) {
// TODO
log.info("回调处理,executeType={}, taskType={}, callbackId={}", executeType, taskType, callbackId);
}
@Override
public void killCoordinatorJob(String jobId) {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
try {
oozieClient.kill(jobId);
} catch (OozieClientException e) {
log.error("停止定时任务失败", e);
}
}
}
注意:上面调用的hdfs的接口是本文开头提到的前提条件,请到相应的文章集成hdfs,因为这是必须的,需要将脚本文件上传到hdfs才可以在oozie中引用到脚本文件。
控制器
package com.winterchen.hadoopdemo.controller;
import com.winterchen.hadoopdemo.model.CoordinatorRequest;
import com.winterchen.hadoopdemo.model.WorkflowRequest;
import com.winterchen.hadoopdemo.service.OozieService;
import com.winterchen.hadoopdemo.utils.APIResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/25 11:10 上午
* @description TODO
**/
@Api(tags = "oozie调度任务")
@RequestMapping("/oozie")
@RestController
public class OozieController {
@Autowired
private OozieService oozieService;
@ApiOperation("提交workflow任务")
@PostMapping("/job/workflow")
public APIResponse<String> submitWorkflowJob(
@RequestBody WorkflowRequest workflowRequest
) {
return APIResponse.success(oozieService.submitWorkflow(workflowRequest));
}
@ApiOperation("提交coordinator定时调度任务")
@PostMapping("/job/coordinator")
public APIResponse<String> submitCoordJob(
@RequestBody CoordinatorRequest coordinatorRequest
) {
return APIResponse.success(oozieService.submitCoordinator(coordinatorRequest));
}
@ApiOperation("停止定时调度任务")
@DeleteMapping("/{jobId}")
public APIResponse<?> killCoordJob(
@PathVariable("jobId")
String jobId
) {
oozieService.killCoordinatorJob(jobId);
return APIResponse.success();
}
@ApiOperation("处理回调")
@GetMapping("/callback")
public APIResponse<?> executeCallback(
@ApiParam(name = "executeType", value = "处理类型", required = true)
@RequestParam(name = "executeType", required = true)
String executeType,
@ApiParam(name = "taskType", value = "任务类型", required = true)
@RequestParam(name = "taskType", required = true)
String taskType,
@ApiParam(name = "callbackId", value = "回调编号", required = true)
@RequestParam(name = "callbackId", required = true)
String callbackId
) {
oozieService.executeCallback(executeType, taskType, callbackId);
return APIResponse.success();
}
}
上面实现的主要功能有:提交workflow和coordinator任务,停止任务等功能;
处理回调并不是必须的,可以根据业务要求来实现各种个性化功能;
WinterChenS/springboot-learning-experience