public class App
{
private final static String SERVERLIST = "192.168.154.100:2181,192.168.154.102:2181,192.168.154.104:2181";
public static void main(String[] args) throws SchedulerException {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
Scheduler sce = null;
// sce.start();
QuartzSchedulerThread i = null;
LiteJob job = null;
AbstractElasticJobExecutor exe = null;
SimpleThreadPool pool = null;
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(SERVERLIST, "movie_data_test"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 创建作业配置
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("MyJob", "0/15 * * * * ?", 3).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyJob2.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}
上面是一段例子,仿照官网上面给出的例子。
本文主要解决这么几个问题。
ElasticJob如何运作?
ElasticJob分片如何实现?
ElasticJob所谓的无中心化是什么意思?
这个问题在前面的章节中已经讲过的,但是这里还得做进一步的说明,因为之前的部分看的比较简单。
首先,要清楚的是在创建Scheduler的时候,需要先创建连接,设置任务的参数,设置事件信息等等然后再统一完成配置信息,然后调用init方法。
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
主要是封装了Job的配置信息,以及创建了ElasticJob中job的一个实例对象,该对象由专门的ip+端口的方式来标识。
/**
* 初始化作业.
*/
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
初始化作业的时候,先是获取到相关的配置,然后注册任务的分片信息。然后封装为JobScheduleController类的实例,这个实例中实际上包含了Scheduler和JobDetai的数据。当然后期调用的话也会调用这个Scheduler类的实例来调用Quartz的内部方法完成任务的触发等工作。
public final class JobScheduleController {
private final Scheduler scheduler;
private final JobDetail jobDetail;
private final String triggerIdentity;
然后重点是后面这个注册作业信息。
SchedulerFacade.registerStartUpInfo方法
门面模式
/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
首先是ListenerManager.startAllListeners()方法。
/**
* 开启所有监听器.
*/
public void startAllListeners() {
electionListenerManager.start();
shardingListenerManager.start();
failoverListenerManager.start();
monitorExecutionListenerManager.start();
shutdownListenerManager.start();
triggerListenerManager.start();
rescheduleListenerManager.start();
guaranteeListenerManager.start();
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
这些监听器实际上是对Zookeeper节点的信息进行监听,一旦节点信息变动了,那么就会触发相应的操作。
主要的几个实现了:
1. 选举主节点监听,监听的是当前有没有主节点。没有就发起选举了。
2. 分片监听器,监听节点的变化设置其标识位为需要重新分片,这个标识位实际上是zk上面的节点,nessary的节点。
3. 任务失败监听器。
4. 执行的时候开启端口可以进行dum出相关数据使用。monitorExecution
5. 关闭监听,这个估计是前端console中发送的事件实现的监听。
6. 触发器监听,这个地方也是前端console中进行触发的,手动实现触发。
7. 重新调度监听,目前不是很清楚,大概率也是前端进行触发的监听器。
8 保证分布式任务全部开始或者全部结束的监听器。
9. 监听连接状态的。
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
比如ElectionListenerManager
@Override
public void start() {
addDataListener(new LeaderElectionJobListener());
addDataListener(new LeaderAbdicationJobListener());
}
class LeaderElectionJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
leaderService.electLeader();
}
}
private boolean isActiveElection(final String path, final String data) {
return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
}
private boolean isPassiveElection(final String path, final Type eventType) {
return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp());
}
private boolean isLeaderCrashed(final String path, final Type eventType) {
return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
}
private boolean isLocalServerEnabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
}
}
class LeaderAbdicationJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}
实际上是监听zk的changeData的时间来实现的监听功能。
接下看下ShardingListenerManager。
@Override
public void start() {
addDataListener(new ShardingTotalCountChangedJobListener());
addDataListener(new ListenServersChangedJobListener());
}
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
监听器启动完成后就开始一些其他的工作了。
/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
LeaderService调用electLeader方法进行leader选举。
/**
* 选举主节点.
*/
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
JobNodeStorage中又调用了executeInLeader方法。
/**
* 在主节点执行操作.
*
* @param latchNode 分布式锁使用的作业节点名称
* @param callback 执行操作的回调
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
以上就是选举的代码
下面的几个方法分别完成任务的注册,任务信息的注册,还有分片的注册等等。
最终这个方法执行结束后就是注册了一堆数据到zk中去了。
然后调用scheduler方法进行调度,这一步调用schedule方法的时候就已经进入了Quartz中去运行了。
然而这个调度的过程并不是要在当前进行启动,因为你在建立作业的时候就已经创建了一个Thread并启动来运行了,当然,刚刚创建的线程会阻塞在while中等待被唤醒。
这个地方在调用的时候没有直接进行调用而是间接的将已经启动起来的线程激活,让其退出等待状态转为运行状态。这个线程就是QuartzSchedulerThread。
/**
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availTreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
Trigger trigger = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
if (trigger != null) {
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 0) {
synchronized(sigLock) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
if (isScheduleChanged()) {
if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
// above call does a clearSignaledSchedulingChange()
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
trigger = null;
break;
}
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
if(trigger == null)
continue;
// set trigger to 'executing'
TriggerFiredBundle bndle = null;
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted;
}
if(goAhead) {
try {
bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while firing trigger '"
+ trigger.getFullName() + "'", se);
} catch (RuntimeException e) {
getLog().error(
"RuntimeException while firing trigger " +
trigger.getFullName(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
}
// it's possible to get 'null' if the trigger was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'", se);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
continue;
}
// TODO: improvements:
//
// 2- make sure we can get a job runshell before firing trigger, or
// don't let that throw an exception (right now it never does,
// but the signature says it can).
// 3- acquire more triggers at a time (based on num threads available?)
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
continue;
}
} else { // if(availTreadCount > 0)
continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
sigLock.wait(timeUntilContinue);
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occured in main trigger firing loop.", re);
}
} // loop...
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
中间有一个JobRunShell的类,这个部分创建了一个shell实例,这里在initialize方法中调用的时候会初始化一个类LiteJob类,内部是用的反射来实现的。
/**
* Lite调度作业.
*
* @author zhangliang
*/
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
会将这个类中的属性进行填充,ElasticJob的实例填充进来,以及jobFacade的实例填充进来。
qsRsrcs.getThreadPool().runInThread(shell)
调用threadPool来运行。
进入了SimpleThreadPool运行。
这个地方实际上已经交给了线程池使用的是LinkedList来存放thread运行。
/**
* <p>
* Run the given <code>Runnable</code> object in the next available
* <code>Thread</code>. If while waiting the thread pool is asked to
* shut down, the Runnable is executed immediately within a new additional
* thread.
* </p>
*
* @param runnable
* the <code>Runnable</code> to be added.
*/
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);//这里实际上是已经有线程在运行了,没必要再次去创建新的。
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();// 这里启动了一个线程池中的线程添加进去了,然后启动起来的
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
看了下wt.run(runnable)方法、
public void run(Runnable newRunnable) {
synchronized(lock) {
if(runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}
runnable = newRunnable;
lock.notifyAll();
}
}
wt.run()方法来运行的话
/**
* <p>
* Loop, executing targets as they are received.
* </p>
*/
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run();// 调用了runnable的run方法
}
}
} catch (InterruptedException unblock) {
// do nothing (loop will terminate if shutdown() was called
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} finally {
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}
}
这里runnable实际就是刚才传进来的shell的实例。接下来就调用这个run方法了。
public void run() {
try {
Trigger trigger = jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
try {
begin();
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't begin execution.", se);
break;
}
// notify job & trigger listeners...
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch(VetoedException ve) {
try {
int instCode = trigger.executionComplete(jec, null);
try {
qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
} catch(JobPersistenceException jpe) {
vetoedJobRetryLoop(trigger, jobDetail, instCode);
}
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't finalize execution.", se);
}
break;
}
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getFullName());
job.execute(jec);// 执行你自己定义的任务。
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getFullName() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getFullName() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getFullName()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
}
jec.setJobRunTime(endTime - startTime);
// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}
int instCode = Trigger.INSTRUCTION_NOOP;
// update the trigger
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
se.setErrorCode(SchedulerException.ERR_TRIGGER_THREW_EXCEPTION);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}
// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}
// update job/trigger or re-execute job
if (instCode == Trigger.INSTRUCTION_RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't finalize execution.", se);
}
continue;
}
try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't finalize execution.", se);
continue;
}
try {
qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail,
instCode);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while marking executed job complete. job= '"
+ jobDetail.getFullName() + "'", jpe);
if (!completeTriggerRetryLoop(trigger, jobDetail, instCode)) {
return;
}
}
break;
} while (true);
} finally {
jobRunShellFactory.returnJobRunShell(this);
}
}
注意中间有个execute方法。
这个方法调用的时候就会调用LiteJob中的execute方法
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
JobExecutorFactory调用的getJobExecutor方法。
/**
* 获取作业执行器.
*
* @param elasticJob 分布式弹性作业
* @param jobFacade 作业内部服务门面服务
* @return 作业执行器
*/
@SuppressWarnings("unchecked")
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
if (null == elasticJob) {
return new ScriptJobExecutor(jobFacade);
}
if (elasticJob instanceof SimpleJob) {
return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
}
if (elasticJob instanceof DataflowJob) {
return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
这里会返回一个SimpleJobExecutor的对象。
接下来是执行作业的步骤。
进入AbstractElasticExecutor中,调用其execute方法。
/**
* 执行作业.
*/
public final void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
try {
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
jobFacade.failoverIfNecessary();
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
注意中间有一个方法比较重要的getShardingCOntexts()方法
@Override
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
shardingService.shardingIfNecessary();
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
这里调用了ShardingService中的shardingIfNecessary方法
/**
* 如果需要分片且当前节点为主节点, 则作业分片.
*
* <p>
* 如果当前无可用节点则不分片.
* </p>
*/
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherJobCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
这里面会进行判断自己是不是主节点,是主节点的话就可以重新分片并将数据写到zk上面去,自己不是主节点的话就阻塞直到分片完成。
接下来就是执行任务,然后看是不是存在什么异常之类的,采取相应的措施就可以了。
其中的这部分代码就包含了失效转移,异常的处理等情况。
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
jobFacade.failoverIfNecessary();
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
这样就彻底完成一次任务的调度过程。
分片上面的分析代码过程中已经看到了,这里做一个总结。
怎么分片的?任务在初始化的时候会直接注册一个分片的标识,nessesary的路径,标识需要进行分片。但是实际分片是在任务运行的时候,也就是任务第一次运行的时候,会查看是不是需要进行分片,如果需要的话就进行分片处理,不需要就不分片。
分片后任务怎么分呢?任务不用分,因为是无中心的,所以不需要分发和领取,每个程序只需要不断的来获取任务然后看能不能执行就好了。
整体来看,在zk上面会把分片的结果写到zk的节点上面去,某个节点的那个线程会来检查分片的结果,与自己的id(前文中提到过ip等标识节点)进行对比,然后如果匹配成功的话也就是说该节点拥有这个分片,可以对该分片进行处理了。然后会for循环调用job的execute来执行对应的分片的任务。这里item是会封装到ShrdingContext中去的所以你在自定义的方法里面可以获取到item的值。
LiteJobFacade类中getShardingContexts方法。
ShardingService中的。
/**
* 获取作业运行实例的分片项集合.
*
* @param jobInstanceId 作业运行实例主键
* @return 作业运行实例的分片项集合
*/
public List<Integer> getShardingItems(final String jobInstanceId) {
JobInstance jobInstance = new JobInstance(jobInstanceId);
if (!serverService.isAvailableServer(jobInstance.getIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
for (int i = 0; i < shardingTotalCount; i++) {
if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
result.add(i);
}
}
return result;
}
ShardingService中挑选出自己可以执行的item的值,对应的是int类型的item的分片值,这个值也是zk路径上面的节点名。
/**
* 获取当前作业服务器分片上下文.
*
* @param shardingItems 分片项
* @return 分片上下文
*/
public ShardingContexts getJobShardingContext(final List<Integer> shardingItems) {
LiteJobConfiguration liteJobConfig = configService.load(false);// 这里是从zk获取的注意
removeRunningIfMonitorExecution(liteJobConfig.isMonitorExecution(), shardingItems);
if (shardingItems.isEmpty()) {
return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), Collections.<Integer, String>emptyMap());
}
Map<Integer, String> shardingItemParameterMap = new ShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap));
}
回到AbstractElasticJobExecutor类中继续进行执行。
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
jobFacade.registerJobBegin(shardingContexts);// 第一步
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);// 第二步
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
第一步
调用LiteJobFacade类中的方法
@Override
public void registerJobBegin(final ShardingContexts shardingContexts) {
executionService.registerJobBegin(shardingContexts);
}
然后调用了ExecutionService中的方法
/**
* 注册作业启动信息.
*
* @param shardingContexts 分片上下文
*/
public void registerJobBegin(final ShardingContexts shardingContexts) {
JobRegistry.getInstance().setJobRunning(jobName, true);
if (!configService.load(true).isMonitorExecution()) {
return;
}
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
}
}
按照追踪的结果来看这段的目的是每次运行的时候都会创建一个/jobName/sharding/{item值}/running的节点。
第二步
实际执行的时候,这里猜到了应该会for循环每个分片的item然后进行调用。
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
调用process->each方法
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
里面的process又将each封装到ShardingContext中去,也就是他的item值就是for循环后的每个item值。
public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this.shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
然后可以调用到你自己定义的job了。
无中心就是没有中心点,各自处理各自的谁也不影响,所有可能产生通信的数据大部分通过zk节点中的数据进行交互。从而使得整个结构不会受到彼此的影响,尤其是保证多个进程或者线程之间的交互,并做出反映,开销比较大。