待运行队列(MasterContext.scheduleQueue)
生产者:定时时间到、或者依赖任务都执行完,往队列里添加待执行的任务
消费者:worker 定时去扫描队列里 是否有任务 需要执行
1、发送一条初始化的事件 Events.Initialize
// Master.init
executeJobPool.execute(() -> {
...
allJobList.forEach(heraAction -> {
masterContext.getDispatcher().
addJobHandler(new JobHandler(heraAction.getId().toString(), this, masterContext));
heraActionMap.put(heraAction.getId(), heraAction);
});
// 发送一条初始化的事件
masterContext.getDispatcher().forwardEvent(Events.Initialize);
});
2、Dispatcher 收到这条事件后,会对每一个JobHandler 去执行初始化的handle操作,执行createScheduleJob函数(等定时时间到后,往队列添加任务)
// JobHandler
public void handleInitialEvent() {
/**
* 如果是定时任务,启动定时程序,独立调度任务,创建quartz调度
*
*/
HeraActionVo heraActionVo = cache.getHeraActionVo();
boolean isSchedule = heraActionVo.getAuto() && Objects.equals(heraActionVo.getScheduleType(), JobScheduleTypeEnum.Independent);
if (isSchedule) {
try {
createScheduleJob(masterContext.getDispatcher(), heraActionVo);
} catch (Exception e) {
if (e instanceof SchedulerException) {
heraActionVo.setAuto(false);
ErrorLog.error("create job quartz schedule error");
}
throw new RuntimeException(e);
}
}
}
1、如果生成的版本大于当前时间,发送一条任务更新事件
// Master
private boolean generateAction(boolean isSingle, Integer jobId) {
try {
if (execute || isSingle) {
Dispatcher dispatcher = masterContext.getDispatcher();
if (dispatcher != null) {
if (actionMap.size() > 0) {
for (Long id : actionMap.keySet()) {
dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext));
if (id >= Long.parseLong(currString)) {
// 如果生成的版本大于当前时间,发送一条任务更新事件
dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString()));
}
}
}
}
}
}
}
2、Dispatcher 收到这条事件后,判断如果是定时任务的话,执行createScheduleJob函数(等定时时间到后,往队列添加任务)
// JobHandler
private void autoRecovery() {
/**
* 如果是独立任务,则重新创建quartz调度
*
*/
if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
try {
createScheduleJob(masterContext.getDispatcher(), heraActionVo);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
1、当任务执行成功后,发送一条HeraJobSuccessEvent事件。所有的JobHandler去处理这条事件。
2、如果该任务 对应的依赖任务 正好是这条已经执行成功的任务,将其依赖任务的这条改成已经执行状态。
3、如果该任务 所有的依赖任务都处于执行成功状态,那么将该任务添加进执行队列
// JobHandler
private void handleSuccessEvent(HeraJobSuccessEvent event) {
JobStatus jobStatus;
synchronized (this) {
jobStatus = heraJobActionService.findJobStatus(actionId);
ScheduleLog.info(actionId + "received a success dependency job with actionId = " + jobId);
// 将任务 对应的依赖任务状态 改成已经执行状态
jobStatus.getReadyDependency().put(jobId, String.valueOf(System.currentTimeMillis()));
heraJobActionService.updateStatus(jobStatus);
}
boolean allComplete = true;
for (String key : heraActionVo.getDependencies()) {
if (jobStatus.getReadyDependency().get(key) == null) {
allComplete = false;
break;
}
}
// 如果该任务 所有的依赖任务都处于执行成功状态
if (allComplete) {
ScheduleLog.info("JobId:" + jobId + " all dependency jobs is ready,run!");
// 那么将该任务添加进执行队列
startNewJob(event.getTriggerType(), heraActionVo);
} else {
ScheduleLog.info(actionId + "some of dependency is not ready, waiting" + JSONObject.toJSONString(jobStatus.getReadyDependency().keySet()));
}
}