当前位置: 首页 > 知识库问答 >
问题:

如何在Java中触发动态调度作业并取消它们?

邹英发
2023-03-14

我怎样才能动态调用一个作业并安心地取消它们?我是否可以触发一个在特定时刻运行的延迟任务,如果该时刻没有过去,就取消它们,就像闹钟一样?

共有2个答案

谢俊英
2023-03-14

我发现了另一个用于此任务的企业作业调度框架,称为PowerJob。通过它提供的OpenAPI,可以轻松创建和取消延迟的任务。这里有源代码<首先,在项目的指导下启动项目。

然后,创建我们自己的项目。我们需要PowerJob worker和PowerJob客户端
所以Pom文件中的依赖项如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-client</artifactId>
            <version>3.4.3</version>
        </dependency>

        <dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-worker-spring-boot-starter</artifactId>
            <version>3.4.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <dependency>
            <groupId>org.hibernate.validator</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>7.0.0.Final</version>
        </dependency>

使用的实体:

响应类:

import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Response {
    private Integer code;
    private String message;
    private JSONObject info;
    
    public static Response success(JSONObject data) {
        return Response.builder().code(200).message("success").info(data).build();
    }
    
    public static Response error(JSONObject data) {
        return Response.builder().code(500).message("fail").info(data).build();
    }
}

警报类:

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AlarmClock {
    private Long id;
    
    @NotBlank(message = "username should not be blank.")
    private String username;
    
    private String clockName;
    
    @NotNull(message = "Delay should not be null.")
    private Long delayMillis;
    
    private Long instanceId;
    
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime createTime;
    
    @Override
    public String toString() {
        return JSON.toJSONStringWithDateFormat(this, JSON.DEFFAULT_DATE_FORMAT);
    }
}

任务就像:

import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.powerjobdemo.entity.AlarmClock;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
public class AlarmClockTask implements BasicProcessor {
    
    public static final DateTimeFormatter STANDARD_DATE_TIME = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
    @Override
    public ProcessResult process(TaskContext taskContext) throws Exception {
        OmsLogger omsLogger = taskContext.getOmsLogger();
        String instanceParams = taskContext.getInstanceParams();
        omsLogger.info("instance params:{}", instanceParams);
        AlarmClock alarmClock = JSON.parseObject(instanceParams, AlarmClock.class);
        assert alarmClock != null;
        String username = alarmClock.getUsername();
        omsLogger.info("Current time is:{}", STANDARD_DATE_TIME.format(LocalDateTime.now()));
        omsLogger.info("Clock info: id:{}, name:{}, creator:{}", alarmClock.getId(), alarmClock.getClockName(), username);
        return new ProcessResult(true, String.format("User: %s running an alarm clock.", username));
    }
}

服务接口如下所示:

import com.alibaba.fastjson.JSONObject;
import com.github.powerjobdemo.entity.AlarmClock;

public interface ClockService {
    /**
     * Add alarm clock.
     *
     * @param alarmClock alarm clock
     * @return json
     */
    JSONObject addAlarmClock(AlarmClock alarmClock);
    
    /**
     * Cancel alarm clock.
     *
     * @param alarmClock alarm clock
     */
    JSONObject cancelAlarmClock(AlarmClock alarmClock);
    
    /**
     * Query all alarm clocks.
     *
     * @param username username
     * @return list
     */
    JSONObject queryAll(String username);
}

服务impl类类似于:

import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.powerjobdemo.entity.AlarmClock;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Service
public class ClockServiceImpl implements ClockService {
    
    public static final DateTimeFormatter CLOCK_NAME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss");
    
    private static final List<AlarmClock> alarmClockList = new CopyOnWriteArrayList<>();
    
    private final AtomicLong clockCount = new AtomicLong();
    
    @Resource
    private OhMyClient ohMyClient;
    
    @Value("${powerjob.task.id}")
    private Long taskId;
    
    @Override
    public JSONObject addAlarmClock(AlarmClock alarmClock) {
        String formattedName = CLOCK_NAME_FORMATTER.format(LocalDateTime.now());
        alarmClock.setClockName("Clock-" + formattedName);
        long id = clockCount.addAndGet(1L);
        alarmClock.setCreateTime(LocalDateTime.now());
        Long delayMillis = alarmClock.getDelayMillis();
        ResultDTO<Long> longResultDTO = ohMyClient.runJob(taskId, alarmClock.toString(), delayMillis);
        alarmClock.setInstanceId(longResultDTO.getData());
        alarmClockList.add(alarmClock);
        JSONObject data = new JSONObject();
        data.put("id", id);
        return data;
    }
    
    @Override
    public JSONObject cancelAlarmClock(AlarmClock alarmClock) {
        Long instanceId = alarmClock.getInstanceId();
        assert instanceId != null;
        ohMyClient.cancelInstance(instanceId);
        alarmClockList.removeIf(clock -> Objects.equals(instanceId, clock.getInstanceId()));
        JSONObject data = new JSONObject();
        data.put("instanceId", instanceId);
        return data;
    }
    
    @Override
    public JSONObject queryAll(String username) {
        List<AlarmClock> clockList = alarmClockList.stream()
                .filter(alarmClock -> StringUtils.equals(alarmClock.getUsername(), username))
                .collect(Collectors.toList());
        JSONObject data = new JSONObject();
        data.put("data", clockList);
        data.put("count", clockList.size());
        return data;
    }
}

最后,控制器是这样的:

import com.github.powerjobdemo.entity.AlarmClock;
import com.github.powerjobdemo.entity.Response;
import com.github.powerjobdemo.service.ClockService;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@RestController
@CrossOrigin
public class AlarmClockController {
    
    @Resource
    private ClockService clockService;
    
    @PostMapping(value = "/api/v1/alarm/clock/add")
    public Response addAlarmClock(@RequestBody AlarmClock alarmClock) {
        return Response.success(clockService.addAlarmClock(alarmClock));
    }
    
    @PostMapping(value = "/api/v1/alarm/clock/cancel")
    public Response cancelAlarmClock(@RequestBody AlarmClock alarmClock) {
        return Response.success(clockService.cancelAlarmClock(alarmClock));
    }
    
    @GetMapping(value = "/api/v1/alarm/clock/query")
    public Response queryAlarmClock(@RequestParam String username) {
        return Response.success(clockService.queryAll(username));
    }
}

所以我们可以发布api/api/v1/alarm/clock/add来添加新的闹钟。例如,创建一个600秒后运行的闹钟。

curl --location --request POST 'http://localhost:8080/api/v1/alarm/clock/add' \
--header 'Content-Type: application/json' \
--data-raw '{
  "username": "Jimmy",
  "delayMillis": 600000
}'

答案是:

{
    "code": 200,
    "message": "success",
    "info": {
        "id": 1
    }
}

然后询问。

curl --location --request GET 'http://localhost:8080/api/v1/alarm/clock/query?username=Jimmy'

答案是:

{
    "code": 200,
    "message": "success",
    "info": {
        "data": [
            {
                "id": 1,
                "username": "Jimmy",
                "clockName": "Clock-20210118-002804",
                "delayMillis": 600000,
                "instanceId": 231210525113975104,
                "createTime": "2021-01-18 00:28:04"
            }
        ],
        "count": 1
    }
}

要取消时钟:

发布取消API:

curl --location --request POST 'http://localhost:8080/api/v1/alarm/clock/cancel' \
--header 'Content-Type: application/json' \
--data-raw '{"instanceId": "231210525113975104"}'

答复如下:

{
    "code": 200,
    "message": "success",
    "info": {
        "instanceId": 231210525113975104
    }
}

我们可以在网页上看到所有实例,这很有帮助。任务实例列表

闾丘成礼
2023-03-14

Quartz是一个很好的调度库,它有很多功能,比如运行许多作业和简单触发器,cron触发器同时在一台机器中或集群中。此外,它可以在内存上运行,也可以在数据库上持久化。有关更多详细信息,请使用Quartz在Spring中进行调度

我已经创建了一个基本的设置,重点是调度的概念。有很多方法可以创建、列出和终止工作。添加了线程。睡眠用于模拟长时间运行的作业。

创造一份新工作

POST http://localhost:8080/start/foo

Trigger is created. Job name is 'foo-1609322783667'

按作业列出触发器

GET http://localhost:8080/list/foo

[
    "foo-1609322783667"
]

干掉跑步的工作

DELETE http://localhost:8080/kill/foo

Job is interrupted

控制台输出:

2020-12-30 13:06:23.671  INFO 920 --- [nio-8080-exec-3] com.example.demo.HomeController          : Job is created. It will be triggered at Wed Dec 30 13:06:28 EET 2020
2020-12-30 13:06:28.681  INFO 920 --- [eduler_Worker-1] com.example.demo.job.FooJob              : Job started DEFAULT.foo-1609322783667
2020-12-30 13:06:51.109  INFO 920 --- [eduler_Worker-1] com.example.demo.job.FooJob              : Job is interrupted DEFAULT.foo-1609322783667
2020-12-30 13:06:51.109  INFO 920 --- [eduler_Worker-1] com.example.demo.job.FooJob              : Job completed DEFAULT.foo-1609322783667

波姆。xml(如果您使用的是Gradle,则可以在build.Gradle上更改定义)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

应用属性

spring.quartz.job-store-type=memory

作业配置

@Configuration
public class JobConfig {
    @Bean
    public JobDetailFactoryBean fooJobDetail() {
        JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
        jobDetailFactory.setJobClass(FooJob.class);
        jobDetailFactory.setDurability(true);
        return jobDetailFactory;
    }

    @Bean
    public JobDetailFactoryBean barJobDetail() {
        JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
        jobDetailFactory.setJobClass(BarJob.class);
        jobDetailFactory.setDurability(true);
        return jobDetailFactory;
    }
}

酒吧

@Slf4j
@Service
public class BarJob implements InterruptableJob {
    private Thread thread;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("Job started {}", jobExecutionContext.getTrigger().getKey());
        thread = Thread.currentThread();
        try {
            Thread.sleep(50_000); // wait 50 seconds
        } catch (InterruptedException ex) {
            log.info("Job is interrupted {}", jobExecutionContext.getTrigger().getKey());
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }
        log.info("Job completed {}", jobExecutionContext.getTrigger().getKey());
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        thread = Thread.currentThread();
    }
}

FooJob

@Slf4j
@Service
public class FooJob implements InterruptableJob {
    private Thread thread;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("Job started {}", jobExecutionContext.getTrigger().getKey());
        thread = Thread.currentThread();
        try {
            Thread.sleep(100_000); // wait 100 seconds
        } catch (InterruptedException ex) {
            log.info("Job is interrupted {}", jobExecutionContext.getTrigger().getKey());
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }
        log.info("Job completed {}", jobExecutionContext.getTrigger().getKey());
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        thread.interrupt();
    }
}

主页控制器

@RestController
@Slf4j
public class HomeController {
    @Autowired
    private Scheduler scheduler;

    @Autowired
    @Qualifier("fooJobDetail")
    private JobDetail fooJobDetail;

    @Autowired
    @Qualifier("barJobDetail")
    private JobDetail barJobDetail;

    @PostMapping("/start/{jobName}")
    public ResponseEntity<String> startJob(@PathVariable("jobName") String jobName) throws SchedulerException {
        Optional<JobDetail> jobDetail = parseJob(jobName);
        if (!jobDetail.isPresent()) {
            return ResponseEntity.badRequest().body("Invalid job name");
        }

        Trigger trigger = TriggerBuilder.newTrigger()
                .forJob(jobDetail.get())
                .withIdentity(jobName + "-" + new Date().getTime()) // unique name
                .startAt(Date.from(Instant.now().plusSeconds(5)))   // starts 5 seconds later
                .build();

        Date date = scheduler.scheduleJob(trigger);
        log.info("Job is created. It will be triggered at {}", date);

        return ResponseEntity.ok("Trigger is created. Job name is '" + trigger.getKey().getName() + "'");
    }

    /**
     * Find the job by job name
     */
    private Optional<JobDetail> parseJob(String jobName) {
        if ("foo".equals(jobName)) {
            return Optional.of(fooJobDetail);
        } else if ("bar".equals(jobName)) {
            return Optional.of(barJobDetail);
        }
        return Optional.empty();
    }

    @GetMapping("/list/{jobName}")
    public ResponseEntity<List<String>> listTriggers(@PathVariable("jobName") String jobName) throws SchedulerException {
        Optional<JobDetail> jobDetail = parseJob(jobName);
        if (!jobDetail.isPresent()) {
            return ResponseEntity.badRequest().build();
        }

        List<String> triggers = scheduler.getTriggersOfJob(jobDetail.get().getKey()).stream()
                .map(t -> t.getKey().getName())
                .collect(Collectors.toList());
        return ResponseEntity.ok(triggers);
    }

    @DeleteMapping("/kill/{jobName}")
    public ResponseEntity<String> killTrigger(@PathVariable("jobName") String jobName) throws SchedulerException {
        Optional<JobDetail> jobDetail = parseJob(jobName);
        if (!jobDetail.isPresent()) {
            return ResponseEntity.badRequest().build();
        }
        scheduler.interrupt(jobDetail.get().getKey());
        return ResponseEntity.ok("Job is interrupted");
    }
}
 类似资料:
  • 问题内容: 如何设置Jenkins作业以触发在变量/参数中动态定义的作业? 我的问题的伪代码: 生成操作“触发/调用在其他项目上生成”以及生成后操作“参数化触发器插件”和“生成其他项目”不允许在参数中定义作业名称。 我似乎没有将GroovyAxis插件与Build多重配置项目一起使用。常规代码似乎不在作业构建环境中运行,而是在作业配置保存期间运行。 我了解您可以通过脚本通过其build-start

  • 我是石英调度器的新手。我有一个批处理文件,它将需要3分钟运行。我需要运行这批每2分钟使用石英调度器。所以我每天安排3个小时。我的问题是我需要检查第一个触发器的状态,如果它不是完整的状态,我需要从这个工作出来。我需要继续我安排的下一个工作。说明:作业53触发器在上午11.30开始,下一个触发器在上午11.32开始,下一个触发器在上午11.34开始,我需要检查上午11.30的触发器状态,如果它不是co

  • 是否可以添加/删除/修改在Quartz Spring Boot中动态安排的作业(在运行时),由使用我的门户的最终用户。由于计划无法从外部访问,我不知道有什么办法。基本上,我需要将所有的时间表信息存储到数据库中并访问它们。Im构建的门户将被大量用户使用,实现这一目标的正确解决方案是什么? 否则我可以像下面这样使用cron吗 每5 mns扫描一次作业以实现此目的。

  • 我正在使用一个石英工作执行特定的任务。 如果另一个Main Job实例仍在运行,我想阻止调度器启动第二个Main Job实例...

  • 我是hadoop新手,我写了一些作业并将它们导出为jar文件。我可以使用hadoop jar命令运行它们,我想每一小时运行一次这些作业。我该怎么做?提前谢谢。

  • 当我启动石英调度程序时,它不会触发我的工作。我的工作被安排在每个小时。但是启动我的调度程序后,我的第一个工作在一个小时后被触发。我是石英新手。下面是我的石英启动代码