当前位置: 首页 > 知识库问答 >
问题:

当前线程的作用域“作业”未处于活动状态,没有可用于作业作用域Spring批处理的上下文持有者

尉迟兴修
2023-03-14

在我的Spring批处理作业中,我试图使用JobExecutionContext在步骤之间共享数据,只有当我将步骤保持为单线程时,它才会起作用,如下所示:

    @EnableTask
    @EnableBatchProcessing
    @Configuration
    @PropertySource(value = {"classpath:application.properties"})

    public class Config{

    private static final HashMap<String,Object> OVERRIDDEN_BY_EXPRESSION = null;
    private static final String QUERY = "SELECT * FROM \"Config\"";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    EntityManager em;


    @Autowired
    DataSource dataSource;


    /*Config Step*/

   @Bean
    public JdbcCursorItemReader<BatchConfig> configReader(DataSource dataSource) {
        JdbcCursorItemReader<BatchConfig> config = new JdbcCursorItemReader<>();
            config.setDataSource(dataSource);

            config.setSql(QUERY);
            config.setRowMapper(new BatchRowMapper());

            return config;
    }

    @Bean
    public ItemWriter<BatchConfig> itemWriter() {
        return new ItemWriter<BatchConfig>() {

            private StepExecution stepExecution;

            @Override
            public void write(List<? extends BatchConfig> items) {
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                for (BatchConfig item : items) {
                    HashMap<String, Object> table = new HashMap<>();
                    table.put("date", item.getDate_time());
                    table.put("size", item.getSize());
                    System.out.println(table);
                    stepContext.put(item.getName(), table);

                }
            }

            @BeforeStep
            public void saveStepExecution(StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        };
    }

    @Bean
    public Step stepConfig(JdbcCursorItemReader<BatchConfig> configReader) throws Exception {
        return stepBuilderFactory.get("stepConfig")
                .<BatchConfig, BatchConfig>chunk(10)
                .reader(configReader)
                .writer(itemWriter())
                .listener(promotionListener())
                .build();
    }

    @Bean
    public ExecutionContextPromotionListener promotionListener() {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"COUNTRY", "CATEGORY", "USER"});
        return listener;
    }



    /*Country Step*/

    @JobScope
    @Bean
    public MongoItemReader<COUNTRY> CountryItemReader(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> table) {
       int date = (int) table.get("date");
        MongoItemReader<COUNTRY> reader = new MongoItemReader<COUNTRY>();
        reader.setTemplate(mongoTemplate);
        reader.setTargetType(COUNTRY.class);
        reader.setCollection("COUNTRY");
        reader.setFields("{\"COUNTRY_NAME\": 1,\"SHORT_NAME\": 1,\"DEPT_CODE\": 1}");
        reader.setSort(new HashMap<String, Sort.Direction>() {{
            put("_id", Sort.Direction.DESC);
        }});
        reader.setQuery("{DATE_TIME: {$gt:"+date+"}}");
        reader.setPageSize(250);
        return reader;
    }

    @Bean
    public CountryItemProcessor CountryProcessor(){
        return new CountryItemProcessor();
    }

    @Bean
    public JpaItemWriter<COUNTRY> country_writer(){
        JpaItemWriter<COUNTRY> jpa = new JpaItemWriter<COUNTRY>();
        jpa.setEntityManagerFactory(em.getEntityManagerFactory());
        return jpa;
    }


    @JobScope
    @Bean
    public Step step1(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> tab) {
        int size = (int) tab.get("size");
        //System.out.println(size);
        return stepBuilderFactory.get("step1")
                .<COUNTRY, COUNTRY>chunk(20)
                .reader(CountryItemReader(OVERRIDDEN_BY_EXPRESSION))
                .writer(country_writer())
                .build();
    }



    @Bean
    public Job TestJob(Step stepConfig) throws Exception {

        return this.jobBuilderFactory.get("TestJob")
                .incrementer(new RunIdIncrementer())// because a spring config bug, this incrementer is not really useful
                .start(stepConfig)
                .next(step1(OVERRIDDEN_BY_EXPRESSION))
               
                .build();
    }

}

但是,添加SimpleAsynctaskeExecutor时发生错误:

    org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.CountryItemReader': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:383) ~[spring-beans-5.3.6.jar:5.3.6]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.6.jar:5.3.6]
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.3.6.jar:5.3.6]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676) ~[spring-aop-5.3.6.jar:5.3.6]
    at org.springframework.batch.item.data.MongoItemReader$$EnhancerBySpringCGLIB$$67443e4.read(<generated>) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar:5.3.6]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371) ~[spring-beans-5.3.6.jar:5.3.6]

我试着像这样解决这个问题:https://github.com/spring-projects/spring-batch/issues/1335,但它似乎只使用了主线程之外的一个线程。

有没有办法在不添加经过调整的代码的情况下解决这个问题?

我计划在Kubernetes上使用远程分区来扩展作业,这个问题会因为作业范围而持续吗?

任何想法或建议都是非常受欢迎的。

共有1个答案

尤俊誉
2023-03-14

我正在尝试使用JobExecutionContext在步骤之间共享数据,这只有在我将步骤保持为单线程的情况下才有效

依赖执行上下文在多线程步骤之间共享数据是不正确的,因为这些键将被并发线程覆盖。参考文档明确提到要在多线程环境中关闭状态管理:

  • Javadoc:如果在多线程客户端中使用,请记住使用saveState=false
  • 参考文档:不建议在多线程或分区步骤中使用作业范围的bean

也就是说,我不知道从一个多线程步骤到下一个步骤可以共享什么密钥(因为线程是并行执行的),但如果确实需要这样做,应该使用另一种方法,比如定义线程安全的共享bean。

 类似资料:
  • 我试图在Spring批处理作业中使用多线程步骤,但我得到一个“范围‘作业’对于当前线程不活动……”。我在Spring中尝试了几种方法,但目前我正在使用我认为是OOTB Spring构造的方法,但仍然失败。 错误是: 基本作业结构简化:作业SoftLayerUpload作业步骤:softlayerUploadFileStep(不能多线程)从Excel文件读取写入SoftLayerDataItemQu

  • 当前线程的作用域“请求”未处于活动状态;如果您想从一个单体引用它,请考虑为这个bean定义一个作用域代理;嵌套的异常是java。lang.IllegalStateException:未找到线程绑定请求:您是指实际web请求之外的请求属性,还是在原始接收线程之外处理请求?如果您实际上在web请求中操作,并且仍然收到此消息,那么您的代码可能在DispatcherServlet/DispatcherPo

  • 当编写器抛出异常时,我希望能够将步骤和作业状态设置为失败。在做了一些调试和检查Spring批处理源代码后,我注意到配置了一个,它认为是一个致命的异常,因此将作业状态设置为FAILED,所以我将代码包装在我的编写器中的一个try-get中,将包装在中,现在作业和步骤状态设置为FAILED,这是我想要的。我不确定这是否是正确的方法,因为我在任何地方都找不到它的文档,的留档也没有提到它。所以,问题是:这

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我有一个作业,它有一个并行执行的块步骤(8个分区): 阅读器:jdbcCursorItemReader 处理器:使用jdbcTemplate调用数据库(每个分区1个线程) Writer:写入文件 我使用一个JdbcCursorItemReader从共享的Postgres数据库(V9.2)读取数百万数据。(其他用户同时使用数据库) 谢谢你的帮助

  • 我在我的JAVA应用程序中配置了Spring批处理作业,该应用程序在集群中运行。因此,相同的作业被执行两次,这是我不想要的。 所以我想在作业中配置一个步骤,它将检查CREATE_DATE是否在BATCH_JOB_EXECUTION表中存在,并将继续或故障转移。 如何在spring批处理步骤中进行配置?