当前位置: 首页 > 工具软件 > xxl-api > 使用案例 >

整合xxl-job-admin动态添加xxl-job定时任务

宋铭
2023-12-01

控制层:XxlJobController.java 

package com.cwp.data.job.controller;

import com.cwp.data.intelligence.common.exception.RRException;
import com.cwp.data.intelligence.common.utils.R;
import com.cwp.data.job.api.XxlJobApi;
import com.cwp.data.job.dto.SaveXxlJobDto;
import com.cwp.data.job.dto.UpdateXxlJobDto;
import com.cwp.data.job.util.XxlJobUtil;
import cn.hutool.http.HttpStatus;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;

/**
 * @ClassName XxlJobController
 * @Date 2021/4/6 11:40
 */
@Slf4j
@RestController
public class XxlJobController implements XxlJobApi {


    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.executor.appname}")
    private String executorAppname;

    @Value("${xxl.job.admin.username:admin}")
    private String username;
    @Value("${xxl.job.admin.password:admin}")
    private String password;

    @Value("${xxl.job.admin.jobGroup:1")
    private Integer jobGroup;


    @ApiOperation(value = "添加jobInfo并启动", httpMethod = "POST")
    @Override
    public R addXxlJob(@RequestBody SaveXxlJobDto saveXxlJobDto) {
       /* Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("jobGroup", 2);
        paramMap.put("jobDesc", "这是测试任务");
        paramMap.put("executorRouteStrategy", "FIRST");
        paramMap.put("cronGen_display", "0/6 * * * * ?");
        paramMap.put("jobCron", "0/6 * * * * ?");
        paramMap.put("glueType", "BEAN");
        paramMap.put("executorHandler", "messageJob"); // 此处hander需提前在项目中定义
        paramMap.put("executorBlockStrategy", "SERIAL_EXECUTION");
        paramMap.put("executorTimeout", 0);
        paramMap.put("executorFailRetryCount", 1);
        paramMap.put("author", "admin");
        paramMap.put("glueRemark", "GLUE代码初始化");
        paramMap.put("triggerStatus", 1);*/
        //设置执行器组
        saveXxlJobDto.setJobGroup(jobGroup);
        String jsonStr=JSONObject.toJSONString(saveXxlJobDto);
        JSONObject jsonObjectParam= JSONObject.parseObject(jsonStr);
        JSONObject response = null;
        try {
            log.info("新增任务入参={}",jsonStr);
            String cookie= getLoginCookie();
            log.info("登录cookie={}",cookie);
            response = XxlJobUtil.addJob(adminAddresses,jsonObjectParam);
        } catch (IOException e) {
            throw new RRException("调用xxl-job-admin-add接口失败!"+e.getMessage());
        }
        if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
            String  jobIdStr=(String)response.get("content");
            Integer jobId=Integer.valueOf(jobIdStr);
            return R.okWithData(jobId);
        } else {
            return  R.error(response.get("msg")!=null?response.get("msg").toString():"");
        }


    }

    @ApiOperation(value = "更新jobInfo", httpMethod = "POST")
    @Override
    public R  updateXxlJob(@RequestBody UpdateXxlJobDto updateXxlJobDto) {
        //设置执行器组
        updateXxlJobDto.setJobGroup(jobGroup);
        String jsonStr=JSONObject.toJSONString(updateXxlJobDto);
        JSONObject jsonObjectParam= JSONObject.parseObject(jsonStr);
        JSONObject response = null;
        try {
            log.info("新增任务入参={}",jsonStr);
            String cookie= getLoginCookie();
            log.info("登录cookie={}",cookie);
            response = XxlJobUtil.updateJob(adminAddresses,jsonObjectParam);
        } catch (IOException e) {
            throw new RRException("调用xxl-job-admin-update接口失败!"+e.getMessage());
        }
        if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
            return R.ok();
        } else {
            return  R.error(response.get("msg")!=null?response.get("msg").toString():"");
        }


    }


    /**
     * 删除任务
     *
     * @param id
     * @return
     * @throws IOException
     */
    @Override
    public R delete(@RequestParam("id") Integer id) {
        try {
            String cookie= getLoginCookie();
            log.info("登录cookie={}",cookie);
            JSONObject response = XxlJobUtil.deleteJob(adminAddresses, id);
            if (response.containsKey("code") &&HttpStatus.HTTP_OK == (Integer) response.get("code")) {
                return R.ok();
            } else {
                return  R.error(response.get("msg")!=null?response.get("msg").toString():"");
            }
        } catch (Exception e) {
            return R.error(e.getMessage());
        }

    }

    /**
     * 开始任务
     *
     * @param id
     * @return
     * @throws IOException
     */
    @Override
    public R start(@RequestParam("id") Integer id) {
        try {
            String cookie= getLoginCookie();
            log.info("登录cookie={}",cookie);
            JSONObject response = XxlJobUtil.startJob(adminAddresses, id);
            if (response.containsKey("code") &&HttpStatus.HTTP_OK == (Integer) response.get("code")) {
                return R.ok();
            } else {
                return  R.error(response.get("msg")!=null?response.get("msg").toString():"");
            }
        } catch (Exception e) {
            return R.error(e.getMessage());
        }




    }

    /**
     * 挂起任务
     *
     * @param id
     * @return
     * @throws IOException
     */
    @Override
    public R stop(@RequestParam("id") Integer id) {
        JSONObject response =null;
        try {
            String cookie= getLoginCookie();
            log.info("登录cookie={}",cookie);
            response=XxlJobUtil.stopJob(adminAddresses, id);
        } catch (IOException e) {
            throw new RRException("调用xxl-job-admin-stop接口失败!"+e.getMessage());
        }
        if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
            return R.ok();
        } else {
            return  R.error(response.get("msg")!=null?response.get("msg").toString():"");
        }



    }

    /**
     * 登陆
     *
     * @param userName
     * @param password
     * @return
     * @throws IOException
     */
    @RequestMapping(value = "/login", method = RequestMethod.GET)
    public R login(@RequestParam("userName") String userName,@RequestParam("password") String password) {
        try {
            String cookie = XxlJobUtil.login(adminAddresses, userName, password);
            if (StringUtils.isNotBlank(cookie)) {
                return R.ok();
            } else {
                throw new Exception("调用xxl-job-admin-login接口失败!");
            }
        } catch (Exception e) {
            return R.error(e.getMessage());
        }
    }
    /**
     * 根据xxl-appname获取对应id
     *
     * @return
     * @throws IOException
     */
    @Override
    public R getAppNameIdByAppname() {

        JSONObject response =null;
        try {
            String cookie= getLoginCookie();
            log.info("登录cookie={}",cookie);
            response = XxlJobUtil.getAppNameIdByAppname(adminAddresses,executorAppname);
        } catch (IOException e) {
            throw new RRException("调用xxl-job-admin-getAppNameIdByAppname接口失败!"+e.getMessage());
        }
        if (response.containsKey("code") && HttpStatus.HTTP_OK == (Integer) response.get("code")) {
            return R.ok();
        } else {
            return  R.error(response.get("msg")!=null?response.get("msg").toString():"");
        }

    }

    /**
    * @Description  获取登录cookie
    * @Author  chengweiping
    * @Date   2021/4/13 16:48
    */
    public  String getLoginCookie() {
        String cookie= XxlJobUtil.getCookie();
        try {
            if (StringUtils.isNotBlank(cookie)) {
                return cookie;
            }
            cookie=XxlJobUtil.login(adminAddresses, username, password);
            XxlJobUtil.setCookie(cookie);
        } catch (IOException e) {
            e.printStackTrace();
            throw new RRException(e.getMessage());
        }
        return cookie;
    }

}

控制层api接口:XxlJobApi.java

package com.cwp.data.job.api;

import com.cwp.data.intelligence.common.utils.R;
import com.cwp.data.job.dto.SaveXxlJobDto;
import com.cwp.data.job.dto.UpdateXxlJobDto;
import org.springframework.web.bind.annotation.*;

/**
 * @ClassName XxlJobApi
 * @Description TODO
 * @Date 2020/12/23 10:46
 */

public interface XxlJobApi {


    /**
    * @Description 新增定时任务
    * @Author  chengweiping
    * @Date   2021/4/14 9:46
    */
    @PostMapping("/xxljob/save")
    R addXxlJob(@RequestBody SaveXxlJobDto saveXxlJobDto);


    /**
     *  更新定时任务
     */
    @PostMapping("/xxljob/update")
    R  updateXxlJob(@RequestBody UpdateXxlJobDto updateXxlJobDto);

    /**
    * @Description  删除定时任务
    * @Author  chengweiping
    * @Date   2021/4/14 9:46
    */
    @RequestMapping(value = "/xxljob/delete", method = RequestMethod.GET)
    R delete(@RequestParam("id") Integer id);

    /**
     *  启动定时任务
     */
    @RequestMapping(value = "/xxljob/start", method = RequestMethod.GET)
    R start(@RequestParam("id") Integer id);

    /**
     * 停止定时任务
     */
    @RequestMapping(value = "/xxljob/stop", method = RequestMethod.GET)
    R stop(@RequestParam("id") Integer id);

    /**
     *  根据启动器名字获取启动器ID
     */
    @RequestMapping(value = "/xxljob/getAppNameIdByAppname", method = RequestMethod.GET)
    R getAppNameIdByAppname();
}

新增任务传输DTO: SaveXxlJobDto.java 

package com.cwp.data.job.dto;

import lombok.Data;

import java.io.Serializable;

/**
 * @ClassName SaveXxlJobDto
 * @Date 2021/4/13 17:40
 */
@Data
public class SaveXxlJobDto implements Serializable {

    /**
     * 执行器ID
     */
    private  Integer jobGroup;

    /**
     *  任务描述
     */
    private  String jobDesc;

    /**
     *  执行策略
     */
    private  String executorRouteStrategy;

    /**
     *  表达式显示
     */
    private   String cronGen_display;


    /**
     *  任务表达式
     */
    private   String jobCron;


    /**
     *  运行模式
     */
    private  String glueType;


    /**
     *  此处hander需提前在项目中定义
     */
    private  String executorHandler;

    /**
     *  失败重试次数
     */
    private Integer executorFailRetryCount;

    /**
     *  负责人
     */
    private  String author;

    private  String glueRemark;

    /**
     *  触发状态
     */
    private  Integer triggerStatus;

    /**
     *  执行参数
     */
    private  String executorParam;


    /**
     *  SERIAL_EXECUTION 阻塞处理策略
     */
    private  String executorBlockStrategy;



}

更新任务DTO:  UpdateXxlJobDto.java

package com.cwp.data.job.dto;

import lombok.Data;

import java.io.Serializable;

/**
 * @ClassName SaveXxlJobDto
 * @Description TODO
 * @Date 2021/4/13 17:40
 */
@Data
public class UpdateXxlJobDto extends SaveXxlJobDto implements Serializable {


    /**
    * @Description 任务ID
    * @Author  chengweiping
    * @Date   2021/4/13 18:50
    */
    private  Integer id;







}

xxl-job工具类:XxlJobUtil.java

package com.cwp.data.job.util;

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.httpclient.Cookie;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;


@Slf4j
public class XxlJobUtil {




    private static String cookie="";

    /**
     * 新增/编辑任务
     * @param url
     * @param requestInfo
     * @return
     * @throws HttpException
     * @throws IOException
     */
    public static JSONObject addJob(String url, JSONObject requestInfo) throws HttpException, IOException {
        String path = "/jobinfo/add";
        String targetPath=url+path;
        cookie=getCookie();
        HttpResponse response = HttpRequest.post(targetPath).form(requestInfo).cookie(cookie).execute();
        JSONObject jsonObject = JSON.parseObject(response.body());
        return jsonObject;
    }



    public static JSONObject updateJob(String url, JSONObject requestInfo) throws HttpException, IOException {
        String path = "/jobinfo/update";
        String targetPath=url+path;
        cookie=getCookie();
        HttpResponse response = HttpRequest.post(targetPath).form(requestInfo).cookie(cookie).execute();
        JSONObject jsonObject = JSON.parseObject(response.body());
        return jsonObject;
    }

    /**
     * 删除任务
     * @param url
     * @param id
     * @return
     * @throws HttpException
     * @throws IOException
     */
    public static JSONObject deleteJob(String url,int id) throws HttpException, IOException {
        String path = "/jobinfo/remove?id=" + id;
        String targetPath=url+path;
        cookie=getCookie();
        HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
        JSONObject jsonObject = JSON.parseObject(response.body());
        return jsonObject;
    }

    /**
     * 开始任务
     * @param url
     * @param id
     * @return
     * @throws HttpException
     * @throws IOException
     */
    public static JSONObject startJob(String url,int id) throws HttpException, IOException {
        String path = "/jobinfo/start?id="+id;
        String targetPath=url+path;
        cookie=getCookie();
        HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
        JSONObject jsonObject = JSON.parseObject(response.body());
        return jsonObject;
    }

    /**
     * 停止任务
     * @param url
     * @param id
     * @return
     * @throws HttpException
     * @throws IOException
     */
    public static JSONObject stopJob(String url,int id) throws HttpException, IOException {
        String path = "/jobinfo/stop?id="+id;
        String targetPath=url+path;
        cookie=getCookie();
        HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
        JSONObject jsonObject = JSON.parseObject(response.body());
        return jsonObject;
    }

    /**
     * 根据xxl-appname获取对应id
     * @param url
     * @param appnameParam
     * @return
     * @throws HttpException
     * @throws IOException
     */
    public static JSONObject getAppNameIdByAppname(String url,String appnameParam) throws HttpException, IOException {
        String path = "/jobgroup/getAppNameIdByAppname?appnameParam="+appnameParam;
        String targetPath=url+path;
        cookie=getCookie();
        HttpResponse response = HttpRequest.post(targetPath).cookie(cookie).execute();
        JSONObject jsonObject = JSON.parseObject(response.body());
        return jsonObject;
    }

    public static JSONObject doGet(String url,String path) throws HttpException, IOException {
        String targetUrl = url + path;
        HttpClient httpClient = new HttpClient();
        HttpMethod get = new GetMethod(targetUrl);
        get.setRequestHeader("cookie", cookie);
        httpClient.executeMethod(get);
        JSONObject result = new JSONObject();
        result = getJsonObject(get, result);
        return result;
    }

    private static JSONObject getJsonObject(HttpMethod get, JSONObject result) throws IOException {
        InputStream inputStream = get.getResponseBodyAsStream();
        BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
        StringBuffer stringBuffer = new StringBuffer();
        String str = "";
        while ((str = br.readLine()) != null) {
            stringBuffer.append(str);
        }
        if (get.getStatusCode() == 200) {
            /**
             *  使用此方式会出现
             *  Going to buffer response body of large or unknown size. Using getResponseBodyAsStream instead is recommended.
             *  异常
             *  String responseBodyAsString = get.getResponseBodyAsString();
             *  result = JSONObject.parseObject(responseBodyAsString);*/
            result = JSONObject.parseObject(stringBuffer.toString());
        } else {
            try {
//                result = JSONObject.parseObject(get.getResponseBodyAsString());
                result = JSONObject.parseObject(stringBuffer.toString());
            } catch (Exception e) {
                result.put("error", stringBuffer.toString());
            }
        }
        return result;
    }


    public static String login(String url, String userName, String password) throws HttpException, IOException {
        String path = "/login?userName="+userName+"&password="+password;
        String targetUrl = url + path;
        HttpClient httpClient = new HttpClient();
        PostMethod get = new PostMethod(targetUrl);
        httpClient.executeMethod(get);
        if (get.getStatusCode() == 200) {
            Cookie[] cookies = httpClient.getState().getCookies();
            StringBuffer tmpcookies = new StringBuffer();
            for (Cookie c : cookies) {
                tmpcookies.append(c.toString() + ";");
            }
            cookie = tmpcookies.toString();
        } else {
            try {
                cookie = "";
            } catch (Exception e) {
                cookie="";
            }
        }
        return cookie;
    }

    /**
    * @Description 获取登录cookie
    * @Author  chengweiping
    * @Date   2021/4/13 16:39
    */
    public static String getCookie() {
        return cookie;
    }

    public static void setCookie(String cookie) {
        XxlJobUtil.cookie = cookie;
    }
}

远程Feign接口:XxlJobFeignApi.java

package com.cwp.data.manager.feign;

import com.cwp.data.development.api.SendEmailApi;
import com.cwp.data.intelligence.common.config.FeignConfiguration;
import com.cwp.data.intelligence.common.constants.MicroServiceConstant;
import com.cwp.data.job.api.XxlJobApi;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;

/**
 * @ClassName SendEmailFeignApi
 * @Date 2020/12/24 15:33
 */
@Component
@FeignClient(value = MicroServiceConstant.DATA_JOB_SERVICE,
        path = MicroServiceConstant.DATA_JOB_PATH,
        configuration = FeignConfiguration.class)
public interface XxlJobFeignApi extends XxlJobApi {
}

常量类:MicroServiceConstant.java

package com.cwp.data.intelligence.common.constants;


public class MicroServiceConstant {

 

    public  final  static String DATA_JOB_SERVICE="data-job-web";

 
    public  final  static String DATA_JOB_PATH="data-job";


}

Feign配置类转发请求头:FeignConfiguration.java 

package com.cwp.data.intelligence.common.config;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;

/**
 * @ClassName FeignConfiguration
 * @Description 设置Feign转发请求头
 * @Date 2020/12/24 14:16
 */
@Configuration
@ConditionalOnBean(value =RequestInterceptor.class)
public class FeignConfiguration implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        if (null != attributes) {
            HttpServletRequest request = attributes.getRequest();
            if (null != request) {
                Enumeration<String> headerNames = request.getHeaderNames();
                if (headerNames != null) {
                    while (headerNames.hasMoreElements()) {
                        String name = headerNames.nextElement();
                        String values = request.getHeader(name);
                        requestTemplate.header(name, values);
                    }
                }
            }
        }
    }
}

返回类:R.java



package com.cwp.data.intelligence.common.utils;

import java.util.HashMap;
import java.util.Map;

public class R extends HashMap<String, Object> {
    private static final long serialVersionUID = 1L;

    public R() {
        put("code", 0);
        put("msg", "success");
    }

    public static R error() {
        return error(500, "未知异常,请联系管理员");
    }

    public static R error(String msg) {
        return error(500, msg);
    }

    public static R error(int code, String msg) {
        R r = new R();
        r.put("code", code);
        r.put("msg", msg);
        return r;
    }

    public static R ok(String msg) {
        R r = new R();
        r.put("msg", msg);
        return r;
    }

    public static R ok(Map<String, Object> map) {
        R r = new R();
        r.putAll(map);
        return r;
    }

    public static R okWithData(Object data) {
        R r = new R();
        r.put("data", data);
        return r;
    }

    public static R okWithPage(Object page) {
        R r = new R();
        r.put("page", page);
        return r;
    }

    public static R ok() {
        return new R();
    }

    public R put(String key, Object value) {
        super.put(key, value);
        return this;
    }

    public boolean isOk() {
        if (get("code") != null) {
            if (Integer.parseInt(get("code").toString()) == 0) {
                return true;
            }
        }
        return false;
    }
}

需要的依赖

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.4</version>
            </dependency>
          <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
             <version>5.4.7</version>
          </dependency>

 类似资料: