Spring Batch stop job 与 @JobScope: No context holder available

田博易
2023-12-01

目录

问题现象

产生原因

解决方法

总结


  • 问题现象

最近在使用SpringBatch框架做项目在做测试的时候遇到了一个坑,发现在执行stop job的时候,总是停不掉而且报了No Context Holder avaliable in current thread,不管是不是在工作的实例上面停,job的状态始终不符合我的预期STOPPED,我的代码如下:

JobExecution jobExecution = jobExplorer.getJobExecution(jobStat.getJobExecutionId());
jobOperator.stop(jobExecution.getJobExecutionId());

//思路就是先从自己的表中找出该job的jobExecutionId, 然后再调用jobOperator的stop方法

经过断点查看异常点:

@Override
	@Transactional
	public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {

		JobExecution jobExecution = findExecutionById(executionId);
		// Indicate the execution should be stopped by setting it's status to
		// 'STOPPING'. It is assumed that
		// the step implementation will check this status at chunk boundaries.
		BatchStatus status = jobExecution.getStatus();
		if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
			throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
		}
		jobExecution.setStatus(BatchStatus.STOPPING);
		jobRepository.update(jobExecution);

		try {
			Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
			if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
				//get the current stepExecution
				for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
					if (stepExecution.getStatus().isRunning()) {
						try {
							//have the step execution that's running -> need to 'stop' it
							Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
							if (step instanceof TaskletStep) {
								Tasklet tasklet = ((TaskletStep)step).getTasklet();
								if (tasklet instanceof StoppableTasklet) {
									StepSynchronizationManager.register(stepExecution);
									((StoppableTasklet)tasklet).stop();
									StepSynchronizationManager.release();
								}
							}
						}
						catch (NoSuchStepException e) {
							logger.warn("Step not found",e);
						}
					}
				}
			}
		}
		catch (NoSuchJobException e) {
			logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called",e);
		}

		return true;
	}

从上述代码可以看到停的流程应该是1. 找出来对应的job 2.找出正在执行的step 3. 记录当前step执行的的状态  4.更新job的状态为stopped

我在执行停止的时候发现在

Step step = ((StepLocator)job).getStep(stepExecution.getStepName());

这个地方我的某个step对应的Bean没有了,导致了遍历到这一行时出现了空指针,然而这个地方catch的是其他的异常,导致事务回滚状态没有更新。

  • 产生原因

原因是我在分区的的master step的定义上面加了@JobScope注解

@Bean(name = StepNameConstant.TRANSACTIONS_MASTER_STEP)
    @JobScope
    public Step transMasterStep(StepBuilderFactory stepBuilderFactory,
                                        @Value("#{jobParameters[" + JobParamConstant.PROCESSDATE + "]}") Date processDate, TaskExecutorPartitionHandler partitionHandler,
                                        @Qualifier(StepNameConstant.TRANS_STEP) Step transStep,
                                        TransPartitioner transPartitioner,
                                        TransStepListener transStepListener)

该注解会导致当前的的step的bean会成为一个动态代理的并且是懒加载的bean,也就是说明这个bean只有在使用的时候才会被创建出来,而在刚才获取的时候根据参数不能创建出来Bean,因为此时没有JobContext,无法获取参数,导致没有正确的创建bean

@Override
	public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

		beanFactory.registerScope(name, this);

		if(!autoProxy) {
			return;
		}

		Assert.state(beanFactory instanceof BeanDefinitionRegistry,
				"BeanFactory was not a BeanDefinitionRegistry, so JobScope cannot be used.");
		BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;

		for (String beanName : beanFactory.getBeanDefinitionNames()) {
			if (!beanName.startsWith(getTargetNamePrefix())) {
				BeanDefinition definition = beanFactory.getBeanDefinition(beanName);
				// Replace this or any of its inner beans with scoped proxy if it
				// has this scope
				boolean scoped = name.equals(definition.getScope());
				Scopifier scopifier = new Scopifier(registry, name, proxyTargetClass, scoped);
				scopifier.visitBeanDefinition(definition);

				if (scoped && !definition.isAbstract()) {
					createScopedProxy(beanName, definition, registry, proxyTargetClass);
				}
			}
		}

	}
private <T> T createLazyProxy(AtomicReference<T> reference, Class<T> type) {
		ProxyFactory factory = new ProxyFactory();
		factory.setTargetSource(new ReferenceTargetSource<>(reference));
		factory.addAdvice(new PassthruAdvice());
		factory.setInterfaces(new Class<?>[] { type });
		@SuppressWarnings("unchecked")
		T proxy = (T) factory.getProxy();
		return proxy;
	}


private class ReferenceTargetSource<T> extends AbstractLazyCreationTargetSource {

		private AtomicReference<T> reference;

		public ReferenceTargetSource(AtomicReference<T> reference) {
			this.reference = reference;
		}

		@Override
		protected Object createObject() throws Exception {
			initialize();
			return reference.get();
		}
	}

上述代码是对于scope注解的处理和对象的生成,那问题究竟该怎么解决呢,还得看一个代码

@Aspect
public class JobScopeManager {

	@Around("execution(void org.springframework.batch.core.Job+.execute(*)) && target(job) && args(jobExecution)")
	public void execute(Job job, JobExecution jobExecution) {
		JobSynchronizationManager.register(jobExecution);
		try {
			job.execute(jobExecution);
		}
		finally {
			JobSynchronizationManager.release();
		}
	}

}
@Override
	public final void execute(JobExecution execution) {

		Assert.notNull(execution, "jobExecution must not be null");

		if (logger.isDebugEnabled()) {
			logger.debug("Job execution starting: " + execution);
		}

		JobSynchronizationManager.register(execution);
		LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs");
		LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
		Timer.Sample timerSample = BatchMetrics.createTimerSample();
		try {

			jobParametersValidator.validate(execution.getJobParameters());

			if (execution.getStatus() != BatchStatus.STOPPING) {

				execution.setStartTime(new Date());
				updateStatus(execution, BatchStatus.STARTED);

				listener.beforeJob(execution);

				try {
					doExecute(execution);
					if (logger.isDebugEnabled()) {
						logger.debug("Job execution complete: " + execution);
					}
				} catch (RepeatException e) {
					throw e.getCause();
				}
			} else {

				// The job was already stopped before we even got this far. Deal
				// with it in the same way as any other interruption.
				execution.setStatus(BatchStatus.STOPPED);
				execution.setExitStatus(ExitStatus.COMPLETED);
				if (logger.isDebugEnabled()) {
					logger.debug("Job execution was stopped: " + execution);
				}

			}

		} catch (JobInterruptedException e) {
			logger.info("Encountered interruption executing job: "
					+ e.getMessage());
			if (logger.isDebugEnabled()) {
				logger.debug("Full exception", e);
			}
			execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
			execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
			execution.addFailureException(e);
		} catch (Throwable t) {
			logger.error("Encountered fatal error executing job", t);
			execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
			execution.setStatus(BatchStatus.FAILED);
			execution.addFailureException(t);
		} finally {
			try {
				if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
						&& execution.getStepExecutions().isEmpty()) {
					ExitStatus exitStatus = execution.getExitStatus();
					ExitStatus newExitStatus =
							ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
					execution.setExitStatus(exitStatus.and(newExitStatus));
				}

				timerSample.stop(BatchMetrics.createTimer("job", "Job duration",
						Tag.of("name", execution.getJobInstance().getJobName()),
						Tag.of("status", execution.getExitStatus().getExitCode())
				));
				longTaskTimerSample.stop();
				execution.setEndTime(new Date());

				try {
					listener.afterJob(execution);
				} catch (Exception e) {
					logger.error("Exception encountered in afterJob callback", e);
				}

				jobRepository.update(execution);
			} finally {
				JobSynchronizationManager.release();
			}

		}

	}

都看到在执行前注册了一个有一个JobSynchronizationManager.register(jobExecution);的东东,于是翻了一下注释

Central convenience class for framework use in managing the job scope
* context. Generally only to be used by implementations of {@link Job}. N.B.
* it is the responsibility of every {@link Job} implementation to ensure that
* a {@link JobContext} is available on every thread that might be involved in
* a job execution, including worker threads from a pool.
  • 解决方法

对于这个问题,有两个解决思路,我也都尝试过

  1. 去掉JobScope注解,直接去掉会有一些错误,#{jobParameter}不能工作,因为出了他的作用域没有JobContext,解决办法是把这个参数移动到StepListener中设置并传递过来,这个方法不好需要改动不少代码,直接放弃了。
  2. 增加register很方便的解决问题,如下:
JobExecution jobExecution = jobExplorer.getJobExecution(jobStat.getJobExecutionId());
try{
    JobSynchronizationManager.register(jobExecution);
    jobOperator.stop(jobStat.getJobExecutionId());
}finally {
    JobSynchronizationManager.close();
}
  • 总结

我觉得可能是springbatch的一个bug,完全可以在里面,当然有可能不是,本质的原因没能理解透彻,有大神的话可以指导一下,欢迎指正交流。

 类似资料: