vipshop saturn job shard执行过程

董良策
2023-12-01

以java job为例,单个job的执行都是从public abstract class AbstractElasticJob
的execute函数开始,execute的触发是在public class SaturnWorker implements Runnable类的run中。

execute函数主要负责

  1. 各种前置条件(包括结束console re-shard 结果)通过后调用真实执行函数executeJobInternal(shardingContext)
  2. 判断是否要进行failover处理

关键代码如下:

executeJobInternal(shardingContext);

			if (isFailoverSupported() && configService.isFailover() && !stopped && !forceStopped && !aborted) {
				failoverService.failoverIfNecessary();
			}

executeJobInternal从名称就可以看出是负责内部执行,包括失败shard的failover
在finally代码中有如下处理:

					if (isFailoverSupported() && configService.isFailover()) {
						failoverService.updateFailoverComplete(item);
					}
				}
			}
			runDownStream(shardingContext);

即设置failover处理完成标注(其实其它节点会再次检测是否完成),调用下游passive 任务,注意是在finally代码,意味job shard本身无论执行成功还是失败,都会被执行,但实际有任何shard失败,都不会执行下游job,原因是
runDownStream有如下代码

if (!mayRunDownStream(shardingContext)) {
return
}

AbstractSaturnJob,通过override mayRunDownStream函数只要有一个失败,就不触发下游

job shard的执行并不是直接在线程内完成,而是通过sumbit 到service 队列完成
以下是submit的thread stack

Thread [executor-2_DemoJob-saturnWorker] (Suspended (breakpoint at line 128 in SaturnJavaJob))	
	owns: HashMap<K,V>  (id=397)	
	SaturnJavaJob.handleJob(SaturnExecutionContext) line: 128	
	SaturnJavaJob(AbstractSaturnJob).executeJob(JobExecutionMultipleShardingContext) line: 62	
	SaturnJavaJob(AbstractElasticJob).executeJobInternal(JobExecutionMultipleShardingContext) line: 203	
	SaturnJavaJob(AbstractElasticJob).execute(Triggered) line: 183	
	SaturnWorker.run() line: 173	
	Executors$RunnableAdapter<T>.call() line: 511	
	FutureTask<V>.run() line: 266	
	ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149	
	ThreadPoolExecutor$Worker.run() line: 624	
	Thread.run() line: 748	

java job超时可以强杀,这个实现过程大致是在public class SaturnJavaJob extends AbstractSaturnJob类的
protected Map<Integer, SaturnJobReturn> handleJob中,在提交job shard执行task,同时启动一个timeout task,代码如下

if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}

timeout 处理代码主要见private static class TimeoutHandleTask implements Runnable

而job failover 处理主要都在package com.vip.saturn.job.internal.failover下。
public class FailoverListenerManager 在启动(public void start())的时候,增加对job下execution的listener,代码如下:

zkCacheManager.addTreeCacheListener(new ExecutionPathListener(), executionPath, 1)

而这个listener在有分片变化时候对running和failover 子item增加两个listener 代码如下

switch (event.getType()) {
					case NODE_ADDED:
						zkCacheManager.addNodeCacheListener(new RunningPathListener(item), runningPath);
						runningAndFailoverPath.add(runningPath);
						zkCacheManager.addNodeCacheListener(new FailoverPathJobListener(item), failoverPath);
						runningAndFailoverPath.add(failoverPath);
						break;
					case NODE_REMOVED:
						zkCacheManager.closeNodeCache(runningPath);
						runningAndFailoverPath.remove(runningPath);
						zkCacheManager.closeNodeCache(failoverPath);
						runningAndFailoverPath.remove(failoverPath);
						break;
					default:
				}

class ExecutionPathListener extends AbstractJobListener 和class FailoverPathJobListener implements NodeCacheListener
的failover触发其实都是item被删除,具体代码如下

if (!executionService.isRunning(item)) {
failover(item);
}
if (!executionService.isFailover(item)) {
failover(item);
}

注意一下:所有executor节点都会触发所有shard的failover 检测

private synchronized void failover 函数负责创建failover待执行节点:leader/failover/items
同时无job 待执行情况下尝试重新执行失败job shard,代码如下:

failoverService.createCrashedFailoverFlag(item);

		if (!executionService.hasRunningItems(jobScheduler.getShardingService().getLocalHostShardingItems())) {
			failoverService.failoverIfNecessary();
		}

failover 在进入正式执行前,为避免多个executor重复执行,还要进行leader选举,这段代码在public class FailoverService的public void failoverIfNecessary()函数中,代码如下:

public void failoverIfNecessary() {
		if (!needFailover()) {
			return;
		}
		// 通过leader/failover/latch 确保只有一个节点执行failover job shard
		getJobNodeStorage()
				.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,
						new FailoverTimeoutLeaderExecutionCallback());
	}

注意上面代码的executeInLeader

 类似资料: