在我的Spring批处理作业中,我试图使用JobExecutionContext在步骤之间共享数据,只有当我将步骤保持为单线程时,它才会起作用,如下所示:
@EnableTask
@EnableBatchProcessing
@Configuration
@PropertySource(value = {"classpath:application.properties"})
public class Config{
private static final HashMap<String,Object> OVERRIDDEN_BY_EXPRESSION = null;
private static final String QUERY = "SELECT * FROM \"Config\"";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
EntityManager em;
@Autowired
DataSource dataSource;
/*Config Step*/
@Bean
public JdbcCursorItemReader<BatchConfig> configReader(DataSource dataSource) {
JdbcCursorItemReader<BatchConfig> config = new JdbcCursorItemReader<>();
config.setDataSource(dataSource);
config.setSql(QUERY);
config.setRowMapper(new BatchRowMapper());
return config;
}
@Bean
public ItemWriter<BatchConfig> itemWriter() {
return new ItemWriter<BatchConfig>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends BatchConfig> items) {
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
for (BatchConfig item : items) {
HashMap<String, Object> table = new HashMap<>();
table.put("date", item.getDate_time());
table.put("size", item.getSize());
System.out.println(table);
stepContext.put(item.getName(), table);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step stepConfig(JdbcCursorItemReader<BatchConfig> configReader) throws Exception {
return stepBuilderFactory.get("stepConfig")
.<BatchConfig, BatchConfig>chunk(10)
.reader(configReader)
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"COUNTRY", "CATEGORY", "USER"});
return listener;
}
/*Country Step*/
@JobScope
@Bean
public MongoItemReader<COUNTRY> CountryItemReader(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> table) {
int date = (int) table.get("date");
MongoItemReader<COUNTRY> reader = new MongoItemReader<COUNTRY>();
reader.setTemplate(mongoTemplate);
reader.setTargetType(COUNTRY.class);
reader.setCollection("COUNTRY");
reader.setFields("{\"COUNTRY_NAME\": 1,\"SHORT_NAME\": 1,\"DEPT_CODE\": 1}");
reader.setSort(new HashMap<String, Sort.Direction>() {{
put("_id", Sort.Direction.DESC);
}});
reader.setQuery("{DATE_TIME: {$gt:"+date+"}}");
reader.setPageSize(250);
return reader;
}
@Bean
public CountryItemProcessor CountryProcessor(){
return new CountryItemProcessor();
}
@Bean
public JpaItemWriter<COUNTRY> country_writer(){
JpaItemWriter<COUNTRY> jpa = new JpaItemWriter<COUNTRY>();
jpa.setEntityManagerFactory(em.getEntityManagerFactory());
return jpa;
}
@JobScope
@Bean
public Step step1(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> tab) {
int size = (int) tab.get("size");
//System.out.println(size);
return stepBuilderFactory.get("step1")
.<COUNTRY, COUNTRY>chunk(20)
.reader(CountryItemReader(OVERRIDDEN_BY_EXPRESSION))
.writer(country_writer())
.build();
}
@Bean
public Job TestJob(Step stepConfig) throws Exception {
return this.jobBuilderFactory.get("TestJob")
.incrementer(new RunIdIncrementer())// because a spring config bug, this incrementer is not really useful
.start(stepConfig)
.next(step1(OVERRIDDEN_BY_EXPRESSION))
.build();
}
}
但是,添加SimpleAsynctaskeExecutor
时发生错误:
org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.CountryItemReader': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:383) ~[spring-beans-5.3.6.jar:5.3.6]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.6.jar:5.3.6]
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.3.6.jar:5.3.6]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676) ~[spring-aop-5.3.6.jar:5.3.6]
at org.springframework.batch.item.data.MongoItemReader$$EnhancerBySpringCGLIB$$67443e4.read(<generated>) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar:5.3.6]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371) ~[spring-beans-5.3.6.jar:5.3.6]
我试着像这样解决这个问题:https://github.com/spring-projects/spring-batch/issues/1335,但它似乎只使用了主线程之外的一个线程。
有没有办法在不添加经过调整的代码的情况下解决这个问题?
我计划在Kubernetes上使用远程分区来扩展作业,这个问题会因为作业范围而持续吗?
任何想法或建议都是非常受欢迎的。
我正在尝试使用JobExecutionContext在步骤之间共享数据,这只有在我将步骤保持为单线程的情况下才有效
依赖执行上下文在多线程步骤之间共享数据是不正确的,因为这些键将被并发线程覆盖。参考文档明确提到要在多线程环境中关闭状态管理:
如果在多线程客户端中使用,请记住使用saveState=false
不建议在多线程或分区步骤中使用作业范围的bean
也就是说,我不知道从一个多线程步骤到下一个步骤可以共享什么密钥(因为线程是并行执行的),但如果确实需要这样做,应该使用另一种方法,比如定义线程安全的共享bean。
我试图在Spring批处理作业中使用多线程步骤,但我得到一个“范围‘作业’对于当前线程不活动……”。我在Spring中尝试了几种方法,但目前我正在使用我认为是OOTB Spring构造的方法,但仍然失败。 错误是: 基本作业结构简化:作业SoftLayerUpload作业步骤:softlayerUploadFileStep(不能多线程)从Excel文件读取写入SoftLayerDataItemQu
当前线程的作用域“请求”未处于活动状态;如果您想从一个单体引用它,请考虑为这个bean定义一个作用域代理;嵌套的异常是java。lang.IllegalStateException:未找到线程绑定请求:您是指实际web请求之外的请求属性,还是在原始接收线程之外处理请求?如果您实际上在web请求中操作,并且仍然收到此消息,那么您的代码可能在DispatcherServlet/DispatcherPo
当编写器抛出异常时,我希望能够将步骤和作业状态设置为失败。在做了一些调试和检查Spring批处理源代码后,我注意到配置了一个,它认为是一个致命的异常,因此将作业状态设置为FAILED,所以我将代码包装在我的编写器中的一个try-get中,将包装在中,现在作业和步骤状态设置为FAILED,这是我想要的。我不确定这是否是正确的方法,因为我在任何地方都找不到它的文档,的留档也没有提到它。所以,问题是:这
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
我有一个作业,它有一个并行执行的块步骤(8个分区): 阅读器:jdbcCursorItemReader 处理器:使用jdbcTemplate调用数据库(每个分区1个线程) Writer:写入文件 我使用一个JdbcCursorItemReader从共享的Postgres数据库(V9.2)读取数百万数据。(其他用户同时使用数据库) 谢谢你的帮助
我在我的JAVA应用程序中配置了Spring批处理作业,该应用程序在集群中运行。因此,相同的作业被执行两次,这是我不想要的。 所以我想在作业中配置一个步骤,它将检查CREATE_DATE是否在BATCH_JOB_EXECUTION表中存在,并将继续或故障转移。 如何在spring批处理步骤中进行配置?