控制层:XxlJobController.java
package com.cwp.data.job.controller;
import com.cwp.data.intelligence.common.exception.RRException;
import com.cwp.data.intelligence.common.utils.R;
import com.cwp.data.job.api.XxlJobApi;
import com.cwp.data.job.dto.SaveXxlJobDto;
import com.cwp.data.job.dto.UpdateXxlJobDto;
import com.cwp.data.job.util.XxlJobUtil;
import cn.hutool.http.HttpStatus;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
/**
* @ClassName XxlJobController
* @Date 2021/4/6 11:40
*/
@Slf4j
@RestController
public class XxlJobController implements XxlJobApi {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String executorAppname;
@Value("${xxl.job.admin.username:admin}")
private String username;
@Value("${xxl.job.admin.password:admin}")
private String password;
@Value("${xxl.job.admin.jobGroup:1")
private Integer jobGroup;
@ApiOperation(value = "添加jobInfo并启动", httpMethod = "POST")
@Override
public R addXxlJob(@RequestBody SaveXxlJobDto saveXxlJobDto) {
/* Map<String, Object> paramMap = new HashMap<>();
paramMap.put("jobGroup", 2);
paramMap.put("jobDesc", "这是测试任务");
paramMap.put("executorRouteStrategy", "FIRST");
paramMap.put("cronGen_display", "0/6 * * * * ?");
paramMap.put("jobCron", "0/6 * * * * ?");
paramMap.put("glueType", "BEAN");
paramMap.put("executorHandler", "messageJob"); // 此处hander需提前在项目中定义
paramMap.put("executorBlockStrategy", "SERIAL_EXECUTION");
paramMap.put("executorTimeout", 0);
paramMap.put("executorFailRetryCount", 1);
paramMap.put("author", "admin");
paramMap.put("glueRemark", "GLUE代码初始化");
paramMap.put("triggerStatus", 1);*/
//设置执行器组
saveXxlJobDto.setJobGroup(jobGroup);
String jsonStr=JSONObject.toJSONString(saveXxlJobDto);
JSONObject jsonObjectParam= JSONObject.parseObject(jsonStr);
JSONObject response = null;
try {
log.info("新增任务入参={}",jsonStr);
String cookie= getLoginCookie();
log.info("登录cookie={}",cookie);
response = XxlJobUtil.addJob(adminAddresses,jsonObjectParam);
} catch (IOException e) {
throw new RRException("调用xxl-job-admin-add接口失败!"+e.getMessage());
}
if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
String jobIdStr=(String)response.get("content");
Integer jobId=Integer.valueOf(jobIdStr);
return R.okWithData(jobId);
} else {
return R.error(response.get("msg")!=null?response.get("msg").toString():"");
}
}
@ApiOperation(value = "更新jobInfo", httpMethod = "POST")
@Override
public R updateXxlJob(@RequestBody UpdateXxlJobDto updateXxlJobDto) {
//设置执行器组
updateXxlJobDto.setJobGroup(jobGroup);
String jsonStr=JSONObject.toJSONString(updateXxlJobDto);
JSONObject jsonObjectParam= JSONObject.parseObject(jsonStr);
JSONObject response = null;
try {
log.info("新增任务入参={}",jsonStr);
String cookie= getLoginCookie();
log.info("登录cookie={}",cookie);
response = XxlJobUtil.updateJob(adminAddresses,jsonObjectParam);
} catch (IOException e) {
throw new RRException("调用xxl-job-admin-update接口失败!"+e.getMessage());
}
if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
return R.ok();
} else {
return R.error(response.get("msg")!=null?response.get("msg").toString():"");
}
}
/**
* 删除任务
*
* @param id
* @return
* @throws IOException
*/
@Override
public R delete(@RequestParam("id") Integer id) {
try {
String cookie= getLoginCookie();
log.info("登录cookie={}",cookie);
JSONObject response = XxlJobUtil.deleteJob(adminAddresses, id);
if (response.containsKey("code") &&HttpStatus.HTTP_OK == (Integer) response.get("code")) {
return R.ok();
} else {
return R.error(response.get("msg")!=null?response.get("msg").toString():"");
}
} catch (Exception e) {
return R.error(e.getMessage());
}
}
/**
* 开始任务
*
* @param id
* @return
* @throws IOException
*/
@Override
public R start(@RequestParam("id") Integer id) {
try {
String cookie= getLoginCookie();
log.info("登录cookie={}",cookie);
JSONObject response = XxlJobUtil.startJob(adminAddresses, id);
if (response.containsKey("code") &&HttpStatus.HTTP_OK == (Integer) response.get("code")) {
return R.ok();
} else {
return R.error(response.get("msg")!=null?response.get("msg").toString():"");
}
} catch (Exception e) {
return R.error(e.getMessage());
}
}
/**
* 挂起任务
*
* @param id
* @return
* @throws IOException
*/
@Override
public R stop(@RequestParam("id") Integer id) {
JSONObject response =null;
try {
String cookie= getLoginCookie();
log.info("登录cookie={}",cookie);
response=XxlJobUtil.stopJob(adminAddresses, id);
} catch (IOException e) {
throw new RRException("调用xxl-job-admin-stop接口失败!"+e.getMessage());
}
if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
return R.ok();
} else {
return R.error(response.get("msg")!=null?response.get("msg").toString():"");
}
}
/**
* 登陆
*
* @param userName
* @param password
* @return
* @throws IOException
*/
@RequestMapping(value = "/login", method = RequestMethod.GET)
public R login(@RequestParam("userName") String userName,@RequestParam("password") String password) {
try {
String cookie = XxlJobUtil.login(adminAddresses, userName, password);
if (StringUtils.isNotBlank(cookie)) {
return R.ok();
} else {
throw new Exception("调用xxl-job-admin-login接口失败!");
}
} catch (Exception e) {
return R.error(e.getMessage());
}
}
/**
* 根据xxl-appname获取对应id
*
* @return
* @throws IOException
*/
@Override
public R getAppNameIdByAppname() {
JSONObject response =null;
try {
String cookie= getLoginCookie();
log.info("登录cookie={}",cookie);
response = XxlJobUtil.getAppNameIdByAppname(adminAddresses,executorAppname);
} catch (IOException e) {
throw new RRException("调用xxl-job-admin-getAppNameIdByAppname接口失败!"+e.getMessage());
}
if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
return R.ok();
} else {
return R.error(response.get("msg")!=null?response.get("msg").toString():"");
}
}
/**
* @Description 获取登录cookie
* @Author chengweiping
* @Date 2021/4/13 16:48
*/
public String getLoginCookie() {
String cookie= XxlJobUtil.getCookie();
try {
if (StringUtils.isNotBlank(cookie)) {
return cookie;
}
cookie=XxlJobUtil.login(adminAddresses, username, password);
XxlJobUtil.setCookie(cookie);
} catch (IOException e) {
e.printStackTrace();
throw new RRException(e.getMessage());
}
return cookie;
}
}
控制层api接口:XxlJobApi.java
package com.cwp.data.job.api;
import com.cwp.data.intelligence.common.utils.R;
import com.cwp.data.job.dto.SaveXxlJobDto;
import com.cwp.data.job.dto.UpdateXxlJobDto;
import org.springframework.web.bind.annotation.*;
/**
* @ClassName XxlJobApi
* @Description TODO
* @Date 2020/12/23 10:46
*/
public interface XxlJobApi {
/**
* @Description 新增定时任务
* @Author chengweiping
* @Date 2021/4/14 9:46
*/
@PostMapping("/xxljob/save")
R addXxlJob(@RequestBody SaveXxlJobDto saveXxlJobDto);
/**
* 更新定时任务
*/
@PostMapping("/xxljob/update")
R updateXxlJob(@RequestBody UpdateXxlJobDto updateXxlJobDto);
/**
* @Description 删除定时任务
* @Author chengweiping
* @Date 2021/4/14 9:46
*/
@RequestMapping(value = "/xxljob/delete", method = RequestMethod.GET)
R delete(@RequestParam("id") Integer id);
/**
* 启动定时任务
*/
@RequestMapping(value = "/xxljob/start", method = RequestMethod.GET)
R start(@RequestParam("id") Integer id);
/**
* 停止定时任务
*/
@RequestMapping(value = "/xxljob/stop", method = RequestMethod.GET)
R stop(@RequestParam("id") Integer id);
/**
* 根据启动器名字获取启动器ID
*/
@RequestMapping(value = "/xxljob/getAppNameIdByAppname", method = RequestMethod.GET)
R getAppNameIdByAppname();
}
新增任务传输DTO: SaveXxlJobDto.java
package com.cwp.data.job.dto;
import lombok.Data;
import java.io.Serializable;
/**
* @ClassName SaveXxlJobDto
* @Date 2021/4/13 17:40
*/
@Data
public class SaveXxlJobDto implements Serializable {
/**
* 执行器ID
*/
private Integer jobGroup;
/**
* 任务描述
*/
private String jobDesc;
/**
* 执行策略
*/
private String executorRouteStrategy;
/**
* 表达式显示
*/
private String cronGen_display;
/**
* 任务表达式
*/
private String jobCron;
/**
* 运行模式
*/
private String glueType;
/**
* 此处hander需提前在项目中定义
*/
private String executorHandler;
/**
* 失败重试次数
*/
private Integer executorFailRetryCount;
/**
* 负责人
*/
private String author;
private String glueRemark;
/**
* 触发状态
*/
private Integer triggerStatus;
/**
* 执行参数
*/
private String executorParam;
/**
* SERIAL_EXECUTION 阻塞处理策略
*/
private String executorBlockStrategy;
}
更新任务DTO: UpdateXxlJobDto.java
package com.cwp.data.job.dto;
import lombok.Data;
import java.io.Serializable;
/**
* @ClassName SaveXxlJobDto
* @Description TODO
* @Date 2021/4/13 17:40
*/
@Data
public class UpdateXxlJobDto extends SaveXxlJobDto implements Serializable {
/**
* @Description 任务ID
* @Author chengweiping
* @Date 2021/4/13 18:50
*/
private Integer id;
}
xxl-job工具类:XxlJobUtil.java
package com.cwp.data.job.util;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.httpclient.Cookie;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@Slf4j
public class XxlJobUtil {
private static String cookie="";
/**
* 新增/编辑任务
* @param url
* @param requestInfo
* @return
* @throws HttpException
* @throws IOException
*/
public static JSONObject addJob(String url, JSONObject requestInfo) throws HttpException, IOException {
String path = "/jobinfo/add";
String targetPath=url+path;
cookie=getCookie();
HttpResponse response = HttpRequest.post(targetPath).form(requestInfo).cookie(cookie).execute();
JSONObject jsonObject = JSON.parseObject(response.body());
return jsonObject;
}
public static JSONObject updateJob(String url, JSONObject requestInfo) throws HttpException, IOException {
String path = "/jobinfo/update";
String targetPath=url+path;
cookie=getCookie();
HttpResponse response = HttpRequest.post(targetPath).form(requestInfo).cookie(cookie).execute();
JSONObject jsonObject = JSON.parseObject(response.body());
return jsonObject;
}
/**
* 删除任务
* @param url
* @param id
* @return
* @throws HttpException
* @throws IOException
*/
public static JSONObject deleteJob(String url,int id) throws HttpException, IOException {
String path = "/jobinfo/remove?id=" + id;
String targetPath=url+path;
cookie=getCookie();
HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
JSONObject jsonObject = JSON.parseObject(response.body());
return jsonObject;
}
/**
* 开始任务
* @param url
* @param id
* @return
* @throws HttpException
* @throws IOException
*/
public static JSONObject startJob(String url,int id) throws HttpException, IOException {
String path = "/jobinfo/start?id="+id;
String targetPath=url+path;
cookie=getCookie();
HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
JSONObject jsonObject = JSON.parseObject(response.body());
return jsonObject;
}
/**
* 停止任务
* @param url
* @param id
* @return
* @throws HttpException
* @throws IOException
*/
public static JSONObject stopJob(String url,int id) throws HttpException, IOException {
String path = "/jobinfo/stop?id="+id;
String targetPath=url+path;
cookie=getCookie();
HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
JSONObject jsonObject = JSON.parseObject(response.body());
return jsonObject;
}
/**
* 根据xxl-appname获取对应id
* @param url
* @param appnameParam
* @return
* @throws HttpException
* @throws IOException
*/
public static JSONObject getAppNameIdByAppname(String url,String appnameParam) throws HttpException, IOException {
String path = "/jobgroup/getAppNameIdByAppname?appnameParam="+appnameParam;
String targetPath=url+path;
cookie=getCookie();
HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
JSONObject jsonObject = JSON.parseObject(response.body());
return jsonObject;
}
public static JSONObject doGet(String url,String path) throws HttpException, IOException {
String targetUrl = url + path;
HttpClient httpClient = new HttpClient();
HttpMethod get = new GetMethod(targetUrl);
get.setRequestHeader("cookie", cookie);
httpClient.executeMethod(get);
JSONObject result = new JSONObject();
result = getJsonObject(get, result);
return result;
}
private static JSONObject getJsonObject(HttpMethod get, JSONObject result) throws IOException {
InputStream inputStream = get.getResponseBodyAsStream();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
StringBuffer stringBuffer = new StringBuffer();
String str = "";
while ((str = br.readLine()) != null) {
stringBuffer.append(str);
}
if (get.getStatusCode() == 200) {
/**
* 使用此方式会出现
* Going to buffer response body of large or unknown size. Using getResponseBodyAsStream instead is recommended.
* 异常
* String responseBodyAsString = get.getResponseBodyAsString();
* result = JSONObject.parseObject(responseBodyAsString);*/
result = JSONObject.parseObject(stringBuffer.toString());
} else {
try {
// result = JSONObject.parseObject(get.getResponseBodyAsString());
result = JSONObject.parseObject(stringBuffer.toString());
} catch (Exception e) {
result.put("error", stringBuffer.toString());
}
}
return result;
}
public static String login(String url, String userName, String password) throws HttpException, IOException {
String path = "/login?userName="+userName+"&password="+password;
String targetUrl = url + path;
HttpClient httpClient = new HttpClient();
PostMethod get = new PostMethod(targetUrl);
httpClient.executeMethod(get);
if (get.getStatusCode() == 200) {
Cookie[] cookies = httpClient.getState().getCookies();
StringBuffer tmpcookies = new StringBuffer();
for (Cookie c : cookies) {
tmpcookies.append(c.toString() + ";");
}
cookie = tmpcookies.toString();
} else {
try {
cookie = "";
} catch (Exception e) {
cookie="";
}
}
return cookie;
}
/**
* @Description 获取登录cookie
* @Author chengweiping
* @Date 2021/4/13 16:39
*/
public static String getCookie() {
return cookie;
}
public static void setCookie(String cookie) {
XxlJobUtil.cookie = cookie;
}
}
远程Feign接口:XxlJobFeignApi.java
package com.cwp.data.manager.feign;
import com.cwp.data.development.api.SendEmailApi;
import com.cwp.data.intelligence.common.config.FeignConfiguration;
import com.cwp.data.intelligence.common.constants.MicroServiceConstant;
import com.cwp.data.job.api.XxlJobApi;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
/**
* @ClassName SendEmailFeignApi
* @Date 2020/12/24 15:33
*/
@Component
@FeignClient(value = MicroServiceConstant.DATA_JOB_SERVICE,
path = MicroServiceConstant.DATA_JOB_PATH,
configuration = FeignConfiguration.class)
public interface XxlJobFeignApi extends XxlJobApi {
}
常量类:MicroServiceConstant.java
package com.cwp.data.intelligence.common.constants;
public class MicroServiceConstant {
public final static String DATA_JOB_SERVICE="data-job-web";
public final static String DATA_JOB_PATH="data-job";
}
Feign配置类转发请求头:FeignConfiguration.java
package com.cwp.data.intelligence.common.config;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
/**
* @ClassName FeignConfiguration
* @Description 设置Feign转发请求头
* @Date 2020/12/24 14:16
*/
@Configuration
@ConditionalOnBean(value =RequestInterceptor.class)
public class FeignConfiguration implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
if (null != attributes) {
HttpServletRequest request = attributes.getRequest();
if (null != request) {
Enumeration<String> headerNames = request.getHeaderNames();
if (headerNames != null) {
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String values = request.getHeader(name);
requestTemplate.header(name, values);
}
}
}
}
}
}
返回类:R.java
package com.cwp.data.intelligence.common.utils;
import java.util.HashMap;
import java.util.Map;
public class R extends HashMap<String, Object> {
private static final long serialVersionUID = 1L;
public R() {
put("code", 0);
put("msg", "success");
}
public static R error() {
return error(500, "未知异常,请联系管理员");
}
public static R error(String msg) {
return error(500, msg);
}
public static R error(int code, String msg) {
R r = new R();
r.put("code", code);
r.put("msg", msg);
return r;
}
public static R ok(String msg) {
R r = new R();
r.put("msg", msg);
return r;
}
public static R ok(Map<String, Object> map) {
R r = new R();
r.putAll(map);
return r;
}
public static R okWithData(Object data) {
R r = new R();
r.put("data", data);
return r;
}
public static R okWithPage(Object page) {
R r = new R();
r.put("page", page);
return r;
}
public static R ok() {
return new R();
}
public R put(String key, Object value) {
super.put(key, value);
return this;
}
public boolean isOk() {
if (get("code") != null) {
if (Integer.parseInt(get("code").toString()) == 0) {
return true;
}
}
return false;
}
}
需要的依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.7</version>
</dependency>