light task schedule(以下简称lts)的任务调度是在jobTracker中实现的。
以下关于任务队列的数据存储以mysql为例子。
jobTracker中进行调度的任务需要jobClient提交,当接收任务提交对任务的相关信息进行存储的同时也会开始对相应的任务进行调度。
以mysql为例子,对于一个节点组nodeGroup的cron形式的定时任务的定时调度涉及三张表。
lts_cron_job_queue表用来存放jobTracker中收到的cron任务提交信息,cron任务的配置数据存放在这张表中。
lts_executing_job_queue表用来存放已经下发给工作节点taskTracker执行的任务(并没有处理完)。
而lts_wjq_ + nodeGroup名,则表示当前已经存放在数据库的等待执行的任务。
在任务正式从jobClient提交到jobTracker中时,任务的定时调用也准备开始。
jobTracker在接收到jobClient的任务提交请求后,会遍历提交的任务依次调用addToQueue()方法将相应的任务进行存储和调度。
for (Job job : jobs) {
try {
addToQueue(job, request);
} catch (Exception e) {
if (exception == null) {
exception = new JobReceiveException(e);
}
exception.addJob(job);
}
}
在addToQueue()中,实则是通过addJob()方法来进行任务的添加。
private void addJob(Job job, JobPo jobPo) throws DupEntryException {
if (job.isCron()) {
addCronJob(jobPo);
} else if (job.isRepeatable()) {
addRepeatJob(jobPo);
} else {
addTriggerTimeJob(jobPo);
}
}
那么可以看到,这里的提交的任务根据任务的配置方式给了三种不同的方式进行任务的添加,分别是cron表达式配置的任务,根据参数设置多久时间间隔执行的repeat类型的任务,和直接配置触发时间的定时任务,这里以cron表达式任务作为例子继续往下走,看到addCronJob()方法。
private void addCronJob(JobPo jobPo) throws DupEntryException {
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(jobPo.getCronExpression());
if (nextTriggerTime != null) {
if (appContext.getRepeatJobQueue().getJob(jobPo.getTaskTrackerNodeGroup(), jobPo.getTaskId()) != null) {
// 这种情况是 由repeat 任务变为了 Cron任务
throw new DupEntryException();
}
// 1.add to cron job queue
appContext.getCronJobQueue().add(jobPo);
if (JobUtils.isRelyOnPrevCycle(jobPo)) {
// 没有正在执行, 则添加
if (appContext.getExecutingJobQueue().getJob(jobPo.getTaskTrackerNodeGroup(), jobPo.getTaskId()) == null) {
// 2. add to executable queue
jobPo.setTriggerTime(nextTriggerTime.getTime());
try {
jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo));
appContext.getExecutableJobQueue().add(jobPo);
} catch (DupEntryException e) {
appContext.getCronJobQueue().remove(jobPo.getJobId());
throw e;
}
}
} else {
// 对于不需要依赖上一周期的,采取批量生成的方式
appContext.getNonRelyOnPrevCycleJobScheduler().addScheduleJobForOneHour(jobPo);
}
}
}
首先,对于cron类型的任务,首先会添加至之前介绍的第一张表lts_cron_job_queue中,存放当前任务的配置信息。
这里的添加cron任务会根据一个isRelyOnPrevCycle属性来确认不同的处理方式,这个属性代表当前添加的任务的触发依不依赖于该任务上一次的执行结果。
也就是说,当这个参数如果设置为false,也就是该任务的触发并不依赖于上一次是否执行完毕,所以可以放心的根据cron表达式计算出该任务的在接下来一段时间中的所有触发次数并记录在库中。而如果为ture,那么只计算该任务下一次的触发时间记录在库中,接下来的触发只能在下一次执行完毕之后进行计算。
那么目前假设该属性为false,那么则会在addScheduleJobForOneHour()方法中,批量将接下来一段时间的该任务的执行情况全部插入在之前所提到的第三张表,”lts_wjq_ + nodeGroup名”表中。虽然该方法名中写的是接下来一小时之内的触发情况,但实则默认情况下是接下来十分钟之内的触发情况。
public static void addCronJobForInterval(ExecutableJobQueue executableJobQueue,
CronJobQueue cronJobQueue,
int scheduleIntervalMinute,
final JobPo finalJobPo,
Date lastGenerateTime) {
JobPo jobPo = JobUtils.copy(finalJobPo);
String cronExpression = jobPo.getCronExpression();
long endTime = DateUtils.addMinute(lastGenerateTime, scheduleIntervalMinute).getTime();
Date timeAfter = lastGenerateTime;
boolean stop = false;
while (!stop) {
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronExpression, timeAfter);
if (nextTriggerTime == null) {
stop = true;
} else {
if (nextTriggerTime.getTime() <= endTime) {
// 添加任务
jobPo.setTriggerTime(nextTriggerTime.getTime());
jobPo.setJobId(JobUtils.generateJobId());
jobPo.setTaskId(finalJobPo.getTaskId() + "_" + DateUtils.format(nextTriggerTime, "MMdd-HHmmss"));
jobPo.setInternalExtParam(Constants.ONCE, Boolean.TRUE.toString());
try {
jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo));
executableJobQueue.add(jobPo);
} catch (DupEntryException e) {
LOGGER.warn("Cron Job[taskId={}, taskTrackerNodeGroup={}] Already Exist in ExecutableJobQueue",
jobPo.getTaskId(), jobPo.getTaskTrackerNodeGroup());
}
} else {
stop = true;
}
}
timeAfter = nextTriggerTime;
}
cronJobQueue.updateLastGenerateTriggerTime(finalJobPo.getJobId(), endTime);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Add CronJob {} to {}", jobPo, DateUtils.formatYMD_HMS(new Date(endTime)));
}
}
具体的逻辑实现在了addCronJobForInterval()方法中,lastGenerateTime为当前时间,而scheduleIntervalMinute默认则是十分钟,这里首先得到十分钟后的时间为endTime,之后不断计算cron表达式的下一次触发时间直到endTime之前,每一次成功的计算都代表在endTime之前一次任务的调用,都会记录在之前之前提到的第三张”lts_wjq_ + nodeGroup名”表中,等待被调用。
在计算到endTime之后,结束计算,并且将endTime更新到cron表中,作为下一次计算任务触发时间的依据。
到此为止,任务的添加的主要逻辑已经结束,此时存在两个问题,一,下一次的任务计算将是什么时候,二,任务的准时触发究竟是怎么做到的。
第一个问题很简单。
executorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory(NonRelyOnPrevCycleJobScheduler.class.getSimpleName(), true));
this.scheduledFuture = executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (running.compareAndSet(false, true)) {
try {
schedule();
} finally {
running.set(false);
}
}
} catch (Throwable t) {
LOGGER.error("Error On Schedule", t);
}
}
}, 10, (scheduleIntervalMinute - 1) * 60, TimeUnit.SECONDS);
默认每隔九分钟,通过定时任务线程池实现一次schedule()方法。
schedule()方法并不复杂。以cron任务为例子,只是从lts_cron_job_queue表中获得所有isRelyOnPrevCycle属性为false的任务,重新根据上一次的endTime计算默认接下来十分钟的所有任务触发并存到相应的表中。
接下来第二个问题,任务的触发
关于具体执行任务的工作节点taskTrcaker,在启动后会每隔一秒去向负责调度的jobTracker节点发送拉去任务的请求。
private void start() {
try {
if (start.compareAndSet(false, true)) {
if (scheduledFuture == null) {
scheduledFuture = executorService.scheduleWithFixedDelay(worker, jobPullFrequency * 1000, jobPullFrequency * 1000, TimeUnit.MILLISECONDS);
}
LOGGER.info("Start Job pull machine success!");
}
} catch (Throwable t) {
LOGGER.error("Start Job pull machine failed!", t);
}
}
图中的woker的run()方法正是实现了具体的发送拉取任务请求的逻辑。
那么我们回到负责调度的jobTracker在接收到请求所处理的行为。
在接收到拉取请求后,将会异步去通过push0()方法去尝试拉取任务并分配。
public void push(final JobPullRequest request) {
this.executorService.submit(new Runnable() {
@Override
public void run() {
try {
push0(request);
} catch (Exception e) {
LOGGER.error("Job push failed!", e);
}
}
});
}
那么我们可以看到push0()的具体实现。
int it = availableThread % batchSize == 0 ? availableThread / batchSize : availableThread / batchSize + 1;
final CountDownLatch latch = new CountDownLatch(it);
for (int i = 1; i <= it; i++) {
int size = batchSize;
if (i == it) {
size = availableThread - batchSize * (it - 1);
}
final int finalSize = size;
pushExecutorService.execute(new Runnable() {
@Override
public void run() {
try {
// 推送任务
send(remotingServer, finalSize, taskTrackerNode);
} catch (Throwable t) {
LOGGER.error("Error on Push Job to {}", taskTrackerNode, t);
} finally {
latch.countDown();
}
}
});
}
TaskTracker在发送拉取任务的请求的时候,会将自己的可用工作线程数量一并发送给jobTacker,这里会根据可用线程数量与规定的一次最大拉取数量相除得到一共需要发送任务执行请求的次数,并一次调用,异步去返回taskTracker所需要的任务。
在接下来的send()方法中,主要调用了jobSender()的send()方法。
List<JobPo> jobPos = fetchJob(taskTrackerNodeGroup, taskTrackerIdentity, size);
if (jobPos.size() == 0) {
return new SendResult(false, JobPushResult.NO_JOB);
}
在这里的核心,就在于fetchJob()方法,也就是说,真正被即时调用的任务将会在这里被取得。
private List<JobPo> fetchJob(String taskTrackerNodeGroup, String taskTrackerIdentity, int size) {
List<JobPo> jobPos = new ArrayList<JobPo>(size);
for (int i = 0; i < size; i++) {
final JobPo jobPo = appContext.getPreLoader().take(taskTrackerNodeGroup, taskTrackerIdentity);
if (jobPo == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job push failed: no job! nodeGroup=" + taskTrackerNodeGroup + ", identity=" + taskTrackerIdentity);
}
break;
}
// IMPORTANT: 这里要先切换队列
try {
appContext.getExecutingJobQueue().add(jobPo);
} catch (DupEntryException e) {
LOGGER.warn("ExecutingJobQueue already exist:" + JSON.toJSONString(jobPo));
appContext.getExecutableJobQueue().resume(jobPo);
continue;
}
appContext.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId());
jobPos.add(jobPo);
}
return jobPos;
}
这里会根据要取得的任务数量大小,依次根据请求的taskTracker的nodeGroup去通过PreLoader的take()方法得到所需要的任务。那么继续往下看,但是更详细的逻辑实现在take()方法后的get()方法中。
private JobPo get(String taskTrackerNodeGroup) {
JobPriorityBlockingDeque queue = getQueue(taskTrackerNodeGroup);
int size = queue.size();
DotLogUtils.dot("AbstractPreLoader.queue size:{},taskTrackerNodeGroup:{}", size, taskTrackerNodeGroup);
if (isInFactor(size)) {
// 触发加载的请求
if (!LOAD_SIGNAL.contains(taskTrackerNodeGroup)) {
LOAD_SIGNAL.add(taskTrackerNodeGroup);
doLoad();
}
}
JobPo jobPo = queue.poll();
if (jobPo != null && jobPo.getPriority() == Integer.MIN_VALUE) {
if (CollectionUtils.isNotEmpty(jobPo.getInternalExtParams())) {
if (jobPo.getInternalExtParams().containsKey(Constants.OLD_PRIORITY)) {
try {
int priority = Integer.parseInt(jobPo.getInternalExtParam(Constants.OLD_PRIORITY));
jobPo.getInternalExtParams().remove(Constants.OLD_PRIORITY);
jobPo.setPriority(priority);
} catch (NumberFormatException ignored) {
}
}
}
}
return jobPo;
}
这里引出了一个新的数据结构,JobPriorityBlockingDeque,也就是,最后任务的获取反倒是根据这个队列的poll()方法来取得队列中的第一个数据,也就是我们要的任务。那么来看这个队列的实现。
public JobPriorityBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
this.list = new LinkedList<JobPo>();
this.comparator = new Comparator<JobPo>() {
@Override
public int compare(JobPo left, JobPo right) {
if (left.getJobId().equals(right.getJobId())) {
return 0;
}
int compare = left.getPriority().compareTo(right.getPriority());
if (compare != 0) {
return compare;
}
compare = left.getTriggerTime().compareTo(right.getTriggerTime());
if (compare != 0) {
return compare;
}
compare = left.getGmtCreated().compareTo(right.getGmtCreated());
if (compare != 0) {
return compare;
}
return -1;
}
};
}
这个队列的实现实则是一个链表,但是实现了一个比较器comparator,当任务入队的时候将会根据比较器根据优先级触发时间等排序,当需要当前优先级最高并且触发时间最近的任务的时候只要简单的取得链表的第一个数据就可以了,这里通过参数规定了链表的最大大小。
那么这个链表何时会有数据进入。
scheduledFuture = LOAD_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
doLoad();
}
}, interval, interval, TimeUnit.MILLISECONDS);
默认情况下,PreLoader每隔0.1秒会调用一次doLoad()方法。
private void doLoad() {
for (final String loadTaskTrackerNodeGroup : LOAD_SIGNAL) {
new Thread(new Runnable() {
@Override
public void run() {
AtomicBoolean loading = LOADING.get(loadTaskTrackerNodeGroup);
if (loading == null) {
loading = new AtomicBoolean(false);
AtomicBoolean _loading = LOADING.putIfAbsent(loadTaskTrackerNodeGroup, loading);
if (_loading != null) {
loading = _loading;
}
}
if (loading.compareAndSet(false, true)) {
try {
handleSignal(loadTaskTrackerNodeGroup);
} finally {
loading.compareAndSet(true, false);
}
}
}
}).start();
}
}
在doLoad()方法中,会对所有存在的nodeGroup也就是工作节点组调用一次handleSignal()方法尝试加载一次工作节点组对应的优先级队列。
在handleSignal()方法中,如果强制执行,或者默认情况下当前队列中的数据小于容量的百分之二十,将会从nodeGroup对应的任务表中进行任务加载,加载到内存中的优先级队列中。
protected List<JobPo> load(String loadTaskTrackerNodeGroup, int loadSize) {
try {
return new SelectSql(sqlTemplate)
.select()
.all()
.from()
.table(getTableName(loadTaskTrackerNodeGroup))
.where("is_running = ?", false)
.and("trigger_time< ?", SystemClock.now())
.orderBy()
.column("priority", OrderByType.ASC)
.column("trigger_time", OrderByType.ASC)
.column("gmt_created", OrderByType.ASC)
.limit(0, loadSize)
.list(RshHolder.JOB_PO_LIST_RSH);
} catch (Exception e) {
LOGGER.error("Error when load job:" + e.getMessage(), e);
return null;
}
}
在加载的时候会获取之前所提到的第三张表中,根据所有没有被分配,并且触发时间已经小于当前时间的任务按照优先级触发时间等排序结果后的loadSize数量的任务加载到内存的优先级队列。
加载完毕后,正式根据PreLoader()从优先级队列中取得所要返回的任务。
在返回任务前,在PreLoader()的take()方法中,会修改之前第三张表中取得的任务的running属性,防止并发情况下,该任务被分配到多个线程中。
if (lockJob(taskTrackerNodeGroup, jobPo.getJobId(),
taskTrackerIdentity, jobPo.getTriggerTime(),
jobPo.getGmtModified())) {
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
jobPo.setIsRunning(true);
jobPo.setGmtModified(SystemClock.now());
return jobPo;
}
最后回到fetchJob()中,在从PreLoader()取得任务后,将得到的任务加入到之前提到的第二张表lts_executing_job_queue表中,代表这任务已经被分配并准备执行,之后从取得的表中,也就是第三张表,nodeGroup对应的待执行的任务中删掉该次任务,代表已经被执行。
try {
appContext.getExecutingJobQueue().add(jobPo);
} catch (DupEntryException e) {
LOGGER.warn("ExecutingJobQueue already exist:" + JSON.toJSONString(jobPo));
appContext.getExecutableJobQueue().resume(jobPo);
continue;
}
appContext.getExecutableJobQueue().remove(jobPo.getTaskTrackerNodeGroup(), jobPo.getJobId());
之后这些任务将会返回到taskTracker中被立即执行,定时任务被触发,由于taskTracker一秒一次的发起拉取任务的请求,可能会导致定时任务距离规定时间存在一秒之内的误差。