当前位置: 首页 > 知识库问答 >
问题:

Spring批处理中没有可用于作业范围的上下文持有者

邰棋
2023-03-14

我试图在Spring批处理作业中使用多线程步骤,但我得到一个“范围‘作业’对于当前线程不活动……”。我在Spring中尝试了几种方法,但目前我正在使用我认为是OOTB Spring构造的方法,但仍然失败。

错误是:

2021-09-19 22:40:03,432 ERROR [https-jsse-nio-8448-exec-4]: org.springframework.batch.core.step.AbstractStep Encountered an error executing step writeToDatabaseStep in job softlayerUploadJob
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.softLayerDataItemQueue': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:365)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
    at com.sun.proxy.$Proxy126.read(Unknown Source)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161)
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)
    at org.html" target="_blank">springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353)
    ... 19 common frames omitted

基本作业结构简化:作业SoftLayerUpload作业步骤:softlayerUploadFileStep(不能多线程)从Excel文件读取写入SoftLayerDataItemQueue)bean,最终写入java.util.队列步骤:WriteToDatabase aseStep从SoftLayerDataItemQueue bean读取使用JpaWriter写入数据库

SoftLayerJob配置。JAVA

public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
    private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);

    private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();

//  private Queue<SoftLayerData> queue = new LinkedList<>();

    // @Value("#{stepExecution.jobExecution.jobInstance.instanceId}")
    @Value("#{jobExecution.jobInstance.instanceId}")
    private int jobInstanceId;

    public Queue<SoftLayerData> getQueue() {
        Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
        logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
        if (result == null) {
            result = new LinkedList<>();
            queueMap.put(jobInstanceId, result);
            
        }
        logger.info("Returning queue with item count=" + result.size());
        return result;
    }

    @Override
    public void write(List<? extends SoftLayerData> items) throws Exception {
        logger.info("@@@ Attempting to add item to queue with bean hashCode=" + this.hashCode() + " job instanceid="
                + jobInstanceId + " ");
        logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
        if (logger.isDebugEnabled()) {
            for (SoftLayerData item : items) {
                logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
            }
        }
        getQueue().addAll(items);
    }

    @Override
    public SoftLayerData read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        logger.info("@@@ Attempting to remove item from queue with bean hashCode=" + this.hashCode()
                + " job instanceid=" + jobInstanceId + " ");
        SoftLayerData result = null;
        if (getQueue() != null && getQueue().size() > 0) {
            result = getQueue().remove();
            logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
        } else {
            logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
        }
        
        return result;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: open()");
        /* Unused method */
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: update()");
        /* Unused method */
    }

    @Override
    public void close() throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: close()");
        /* Unused method */
    }

}

SoftLayerDataItemQueue。JAVA

public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
    private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);

    private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();

    @Value("#{jobExecution.jobInstance.instanceId}")
    private int jobInstanceId;

    public Queue<SoftLayerData> getQueue() {
        Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
        logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
        if (result == null) {
            result = new LinkedList<>();
            queueMap.put(jobInstanceId, result);
            
        }
        logger.info("Returning queue with item count=" + result.size());
        return result;
    }

    @Override
    public void write(List<? extends SoftLayerData> items) throws Exception {
        logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
        if (logger.isDebugEnabled()) {
            for (SoftLayerData item : items) {
                logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
            }
        }
        getQueue().addAll(items);
    }

    @Override
    public SoftLayerData read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                + " job instanceid=" + jobInstanceId + " ");
        SoftLayerData result = null;
        if (getQueue() != null && getQueue().size() > 0) {
            result = getQueue().remove();
            logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
        } else {
            logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
        }
        
        return result;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: open()");
        /* Unused method */
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: update()");
        /* Unused method */
    }

    @Override
    public void close() throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: close()");
        /* Unused method */
    }

}

注意:我不喜欢使用SoftLayerDataItemQueue,但我想不出任何其他方法来写入在一个步骤中处理的项目,并在另一个步骤中读取它们,尤其是对于大容量和并行处理。我希望Spring能有某种方式将数据从一个步骤写入另一个步骤,但我找不到它。其他人建议写一个文件,或者在工作或步骤上下文中。

共有1个答案

程和畅
2023-03-14

从某种意义上说,这只是一个部分答案,因为这解决了我的问题,但我无法解释为什么这是必要的。遵循https://github.com/spring-projects/spring-batch/issues/1335

我创建了自己的SimpleAsyncTaskExecutor,如下所示:

public class ParallelSimpleAsyncTaskExecutor extends SimpleAsyncTaskExecutor {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public ParallelSimpleAsyncTaskExecutor(String prefix) {
        super(prefix);
    }

    @Override
    protected void doExecute(Runnable task) {
        JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
        super.doExecute(new Runnable() {
            @Override
            public
            void run() {
                JobSynchronizationManager.register(jobExecution);
                try {
                    task.run();
                } finally {
                    JobSynchronizationManager.release();
                }
                
            }
        });
    }

}

 类似资料:
  • 在我的Spring批处理作业中,我试图使用JobExecutionContext在步骤之间共享数据,只有当我将步骤保持为单线程时,它才会起作用,如下所示: 但是,添加时发生错误: 我试着像这样解决这个问题:https://github.com/spring-projects/spring-batch/issues/1335,但它似乎只使用了主线程之外的一个线程。 有没有办法在不添加经过调整的代码的

  • 这是我的简单工作配置: 如您所见,我已按照此处的说明配置了bean,并将以下行添加到我的中: 这就是我犯的错误。我哪里做错了? 请注意,我的项目配置了2个数据源,以及Spring MVC Web: 数据源1,对于JPA/Hibrate: 数据源2,用于Spring批处理JDBC作业存储库: 我找不到这里提到的神秘XML。 出什么事了? 谢谢。

  • 如果我有一个Web应用程序,它的应用程序上下文加载了我的webapp和所有作业配置文件的所有内容,如果我的作业中有一个没有范围="步骤"的简单ItemReader,那么阅读器是单例的,对吗?所以如果我通过SimpleJobLauncher从控制器启动两次作业,我会使用同一个bean,对吗?除非我放入范围="步骤",以便每个作业执行一个bean? 另一方面,如果我从CommandLineJobRun

  • 我试图在运行时设置spring批处理作业的块大小,方法是将其注入我的步骤中,如下所示: 但我得到以下错误:java.lang.非法状态异常:没有上下文持有人可用的工作范围 我在网上做了一些研究,但不明白为什么我会遇到这个异常。如果您能帮助我理解这个错误的含义以及如何解决它,我将不胜感激。谢谢!

  • 我只想知道spring batch中是否有“job”范围,就像“step”范围一样?如果没有,我们应该开发我们的自定义范围,还是有更好的替代方案? 提前谢了。

  • 我正在处理一个使用Spring批处理的项目。在本项目中,我使用Spring批处理后期绑定,其中我使用JobParameters注入了一个参数(将用作SQL读取器查询的条件的字符串)。目前,我正在使用进行后期绑定,所有操作都非常正常。 这里我要问的是何时使用以及何时使用。我已经阅读了Spring批处理参考文件,并在谷歌上搜索了StepScope和jobscope。我得到的只是: a.StepScop