目录
最近在使用SpringBatch框架做项目在做测试的时候遇到了一个坑,发现在执行stop job的时候,总是停不掉而且报了No Context Holder avaliable in current thread,不管是不是在工作的实例上面停,job的状态始终不符合我的预期STOPPED,我的代码如下:
JobExecution jobExecution = jobExplorer.getJobExecution(jobStat.getJobExecutionId());
jobOperator.stop(jobExecution.getJobExecutionId());
//思路就是先从自己的表中找出该job的jobExecutionId, 然后再调用jobOperator的stop方法
经过断点查看异常点:
@Override
@Transactional
public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
JobExecution jobExecution = findExecutionById(executionId);
// Indicate the execution should be stopped by setting it's status to
// 'STOPPING'. It is assumed that
// the step implementation will check this status at chunk boundaries.
BatchStatus status = jobExecution.getStatus();
if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
}
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
try {
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
//get the current stepExecution
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isRunning()) {
try {
//have the step execution that's running -> need to 'stop' it
Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
if (step instanceof TaskletStep) {
Tasklet tasklet = ((TaskletStep)step).getTasklet();
if (tasklet instanceof StoppableTasklet) {
StepSynchronizationManager.register(stepExecution);
((StoppableTasklet)tasklet).stop();
StepSynchronizationManager.release();
}
}
}
catch (NoSuchStepException e) {
logger.warn("Step not found",e);
}
}
}
}
}
catch (NoSuchJobException e) {
logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called",e);
}
return true;
}
从上述代码可以看到停的流程应该是1. 找出来对应的job 2.找出正在执行的step 3. 记录当前step执行的的状态 4.更新job的状态为stopped
我在执行停止的时候发现在
Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
这个地方我的某个step对应的Bean没有了,导致了遍历到这一行时出现了空指针,然而这个地方catch的是其他的异常,导致事务回滚状态没有更新。
原因是我在分区的的master step的定义上面加了@JobScope注解
@Bean(name = StepNameConstant.TRANSACTIONS_MASTER_STEP)
@JobScope
public Step transMasterStep(StepBuilderFactory stepBuilderFactory,
@Value("#{jobParameters[" + JobParamConstant.PROCESSDATE + "]}") Date processDate, TaskExecutorPartitionHandler partitionHandler,
@Qualifier(StepNameConstant.TRANS_STEP) Step transStep,
TransPartitioner transPartitioner,
TransStepListener transStepListener)
该注解会导致当前的的step的bean会成为一个动态代理的并且是懒加载的bean,也就是说明这个bean只有在使用的时候才会被创建出来,而在刚才获取的时候根据参数不能创建出来Bean,因为此时没有JobContext,无法获取参数,导致没有正确的创建bean
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
beanFactory.registerScope(name, this);
if(!autoProxy) {
return;
}
Assert.state(beanFactory instanceof BeanDefinitionRegistry,
"BeanFactory was not a BeanDefinitionRegistry, so JobScope cannot be used.");
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
for (String beanName : beanFactory.getBeanDefinitionNames()) {
if (!beanName.startsWith(getTargetNamePrefix())) {
BeanDefinition definition = beanFactory.getBeanDefinition(beanName);
// Replace this or any of its inner beans with scoped proxy if it
// has this scope
boolean scoped = name.equals(definition.getScope());
Scopifier scopifier = new Scopifier(registry, name, proxyTargetClass, scoped);
scopifier.visitBeanDefinition(definition);
if (scoped && !definition.isAbstract()) {
createScopedProxy(beanName, definition, registry, proxyTargetClass);
}
}
}
}
private <T> T createLazyProxy(AtomicReference<T> reference, Class<T> type) {
ProxyFactory factory = new ProxyFactory();
factory.setTargetSource(new ReferenceTargetSource<>(reference));
factory.addAdvice(new PassthruAdvice());
factory.setInterfaces(new Class<?>[] { type });
@SuppressWarnings("unchecked")
T proxy = (T) factory.getProxy();
return proxy;
}
private class ReferenceTargetSource<T> extends AbstractLazyCreationTargetSource {
private AtomicReference<T> reference;
public ReferenceTargetSource(AtomicReference<T> reference) {
this.reference = reference;
}
@Override
protected Object createObject() throws Exception {
initialize();
return reference.get();
}
}
上述代码是对于scope注解的处理和对象的生成,那问题究竟该怎么解决呢,还得看一个代码
@Aspect
public class JobScopeManager {
@Around("execution(void org.springframework.batch.core.Job+.execute(*)) && target(job) && args(jobExecution)")
public void execute(Job job, JobExecution jobExecution) {
JobSynchronizationManager.register(jobExecution);
try {
job.execute(jobExecution);
}
finally {
JobSynchronizationManager.release();
}
}
}
@Override
public final void execute(JobExecution execution) {
Assert.notNull(execution, "jobExecution must not be null");
if (logger.isDebugEnabled()) {
logger.debug("Job execution starting: " + execution);
}
JobSynchronizationManager.register(execution);
LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs");
LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
Timer.Sample timerSample = BatchMetrics.createTimerSample();
try {
jobParametersValidator.validate(execution.getJobParameters());
if (execution.getStatus() != BatchStatus.STOPPING) {
execution.setStartTime(new Date());
updateStatus(execution, BatchStatus.STARTED);
listener.beforeJob(execution);
try {
doExecute(execution);
if (logger.isDebugEnabled()) {
logger.debug("Job execution complete: " + execution);
}
} catch (RepeatException e) {
throw e.getCause();
}
} else {
// The job was already stopped before we even got this far. Deal
// with it in the same way as any other interruption.
execution.setStatus(BatchStatus.STOPPED);
execution.setExitStatus(ExitStatus.COMPLETED);
if (logger.isDebugEnabled()) {
logger.debug("Job execution was stopped: " + execution);
}
}
} catch (JobInterruptedException e) {
logger.info("Encountered interruption executing job: "
+ e.getMessage());
if (logger.isDebugEnabled()) {
logger.debug("Full exception", e);
}
execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
execution.addFailureException(e);
} catch (Throwable t) {
logger.error("Encountered fatal error executing job", t);
execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
execution.setStatus(BatchStatus.FAILED);
execution.addFailureException(t);
} finally {
try {
if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
&& execution.getStepExecutions().isEmpty()) {
ExitStatus exitStatus = execution.getExitStatus();
ExitStatus newExitStatus =
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
execution.setExitStatus(exitStatus.and(newExitStatus));
}
timerSample.stop(BatchMetrics.createTimer("job", "Job duration",
Tag.of("name", execution.getJobInstance().getJobName()),
Tag.of("status", execution.getExitStatus().getExitCode())
));
longTaskTimerSample.stop();
execution.setEndTime(new Date());
try {
listener.afterJob(execution);
} catch (Exception e) {
logger.error("Exception encountered in afterJob callback", e);
}
jobRepository.update(execution);
} finally {
JobSynchronizationManager.release();
}
}
}
都看到在执行前注册了一个有一个JobSynchronizationManager.register(jobExecution);的东东,于是翻了一下注释
Central convenience class for framework use in managing the job scope * context. Generally only to be used by implementations of {@link Job}. N.B. * it is the responsibility of every {@link Job} implementation to ensure that * a {@link JobContext} is available on every thread that might be involved in * a job execution, including worker threads from a pool.
对于这个问题,有两个解决思路,我也都尝试过
JobExecution jobExecution = jobExplorer.getJobExecution(jobStat.getJobExecutionId());
try{
JobSynchronizationManager.register(jobExecution);
jobOperator.stop(jobStat.getJobExecutionId());
}finally {
JobSynchronizationManager.close();
}
我觉得可能是springbatch的一个bug,完全可以在里面,当然有可能不是,本质的原因没能理解透彻,有大神的话可以指导一下,欢迎指正交流。