以java job为例,单个job的执行都是从public abstract class AbstractElasticJob
的execute函数开始,execute的触发是在public class SaturnWorker implements Runnable类的run中。
execute函数主要负责
关键代码如下:
executeJobInternal(shardingContext);
if (isFailoverSupported() && configService.isFailover() && !stopped && !forceStopped && !aborted) {
failoverService.failoverIfNecessary();
}
executeJobInternal从名称就可以看出是负责内部执行,包括失败shard的failover
在finally代码中有如下处理:
if (isFailoverSupported() && configService.isFailover()) {
failoverService.updateFailoverComplete(item);
}
}
}
runDownStream(shardingContext);
即设置failover处理完成标注(其实其它节点会再次检测是否完成),调用下游passive 任务,注意是在finally代码,意味job shard本身无论执行成功还是失败,都会被执行,但实际有任何shard失败,都不会执行下游job,原因是
runDownStream有如下代码
if (!mayRunDownStream(shardingContext)) {
return
}
AbstractSaturnJob,通过override mayRunDownStream函数只要有一个失败,就不触发下游
job shard的执行并不是直接在线程内完成,而是通过sumbit 到service 队列完成
以下是submit的thread stack
Thread [executor-2_DemoJob-saturnWorker] (Suspended (breakpoint at line 128 in SaturnJavaJob))
owns: HashMap<K,V> (id=397)
SaturnJavaJob.handleJob(SaturnExecutionContext) line: 128
SaturnJavaJob(AbstractSaturnJob).executeJob(JobExecutionMultipleShardingContext) line: 62
SaturnJavaJob(AbstractElasticJob).executeJobInternal(JobExecutionMultipleShardingContext) line: 203
SaturnJavaJob(AbstractElasticJob).execute(Triggered) line: 183
SaturnWorker.run() line: 173
Executors$RunnableAdapter<T>.call() line: 511
FutureTask<V>.run() line: 266
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
ThreadPoolExecutor$Worker.run() line: 624
Thread.run() line: 748
java job超时可以强杀,这个实现过程大致是在public class SaturnJavaJob extends AbstractSaturnJob类的
protected Map<Integer, SaturnJobReturn> handleJob中,在提交job shard执行task,同时启动一个timeout task,代码如下
if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}
timeout 处理代码主要见private static class TimeoutHandleTask implements Runnable
而job failover 处理主要都在package com.vip.saturn.job.internal.failover下。
public class FailoverListenerManager 在启动(public void start())的时候,增加对job下execution的listener,代码如下:
zkCacheManager.addTreeCacheListener(new ExecutionPathListener(), executionPath, 1)
而这个listener在有分片变化时候对running和failover 子item增加两个listener 代码如下
switch (event.getType()) {
case NODE_ADDED:
zkCacheManager.addNodeCacheListener(new RunningPathListener(item), runningPath);
runningAndFailoverPath.add(runningPath);
zkCacheManager.addNodeCacheListener(new FailoverPathJobListener(item), failoverPath);
runningAndFailoverPath.add(failoverPath);
break;
case NODE_REMOVED:
zkCacheManager.closeNodeCache(runningPath);
runningAndFailoverPath.remove(runningPath);
zkCacheManager.closeNodeCache(failoverPath);
runningAndFailoverPath.remove(failoverPath);
break;
default:
}
class ExecutionPathListener extends AbstractJobListener 和class FailoverPathJobListener implements NodeCacheListener
的failover触发其实都是item被删除,具体代码如下
if (!executionService.isRunning(item)) {
failover(item);
}
if (!executionService.isFailover(item)) {
failover(item);
}
注意一下:所有executor节点都会触发所有shard的failover 检测
private synchronized void failover 函数负责创建failover待执行节点:leader/failover/items
同时无job 待执行情况下尝试重新执行失败job shard,代码如下:
failoverService.createCrashedFailoverFlag(item);
if (!executionService.hasRunningItems(jobScheduler.getShardingService().getLocalHostShardingItems())) {
failoverService.failoverIfNecessary();
}
failover 在进入正式执行前,为避免多个executor重复执行,还要进行leader选举,这段代码在public class FailoverService的public void failoverIfNecessary()函数中,代码如下:
public void failoverIfNecessary() {
if (!needFailover()) {
return;
}
// 通过leader/failover/latch 确保只有一个节点执行failover job shard
getJobNodeStorage()
.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,
new FailoverTimeoutLeaderExecutionCallback());
}
注意上面代码的executeInLeader