当前位置: 首页 > 知识库问答 >
问题:

Spring批分割在itemReader中注入stepExecutionContext参数

葛奇
2023-03-14

我的Spring批处理配置:

@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename) 
    throws UnexpectedInputException, ParseException {
    FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens = { "username", "userid", "transactiondate", "amount" };
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource(
        "input/"+filename));
    DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean(name = "partitioningJob")  
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();  
}  

@Bean 
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();  
}  

@Bean 
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();  
}  

@Bean 
public TransactionPartitioner partitioner() {  
    TransactionPartitioner partitioner = new TransactionPartitioner();  
    return partitioner;  
}                           

@Bean 
public JobListener jobListener() {  
   return new JobListener();  
} 

 @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(2);
        taskExecutor.setQueueCapacity(2);
        taskExecutor.setCorePoolSize(2);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }  

我的TransactionPartition类是:

public class TransactionPartitioner implements Partitioner {  

public Map<String, ExecutionContext> partition(int range) {  
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();  
    for (int i = 1; i <= range; i++) {  
        ExecutionContext exContext = new ExecutionContext();  
        exContext.put("filename", "input"+i+".csv");
        exContext.put("name", "Thread" + i);  
        result.put("partition" + i, exContext);  
    }       
    return result;  
}  
}

这不是正确的做法吗?请建议。

  18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy19.run(Unknown Source)
    at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
    at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
    ... 9 more
  | STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
|                 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL    
|                 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]}                                                                                            | NULL               |
|                 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}  

共有1个答案

哈宪
2023-03-14

应用程序代码不负责调用partition方法,

@Bean 
public TransactionPartitioner partitioner() {  
    TransactionPartitioner partitioner = new TransactionPartitioner();  
    partitioner.partition(10);  
    return partitioner;  
}  

框架将为您调用partition方法。只需返回partitioner,而无需显式调用partition(10)方法。

话虽如此,您需要在partitioner步骤中设置partitionergridsize,如下所示,

@Bean 
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(10).taskExecutor(taskExecutor).build();  
}  
 类似资料:
  • spring batch的新版本,并尝试为大量数据输入实现分区。我已经检查了下面的解决方案,这些解决方案与我的问题相似,但不能解决问题。

  • 问题内容: 这是我的一部分: This is the item reader: 这是Spring Batch在运行时所说的: 怎么了 在Spring 3.0中,我在哪里可以了解有关这些机制的更多信息? 问题答案: 如前所述,你的阅读器需要进行“逐步”调整。你可以通过注释完成此操作。如果你将该注释添加到阅读器,则它应该对你有用,如下所示: 该范围默认情况下不可用,但是如果你正在使用XML名称空间,则

  • 问题内容: 这是我的一部分: 这是商品阅读器: 这是Spring Batch在运行时所说的: 怎么了 在Spring 3.0中,我在哪里可以了解有关这些机制的更多信息? 问题答案: 如前所述,您的阅读器需要进行“逐步”调整。您可以通过注释完成此操作。如果您将该注释添加到阅读器,则它应该对您有用,如下所示: 该范围默认情况下不可用,但是如果您正在使用XML名称空间,则该范围将不可用。如果不是这样,请

  • 我有一个Spring Boot(面向批处理)应用程序,它使用一个数据源来完成批处理作业,并将内容写入数据库。 我在类似: 问题是,当我尝试将数据源注入一个Spring配置文件时: ...它告诉我: 无法自动连线。存在多个“DataSource”类型的bean。 Beans:数据源 我还尝试注入数据源,例如: ...但是没有运气:(,尽管这两个数据源的问题最终消失了。 有什么线索可以“绕过”吗?

  • 我正在为Grails使用Spring批处理插件(Spring-batch-1.0.RC2)。到目前为止工作正常,但我想分割流来执行。。。支持吗?这是我试图执行的代码。。。但结果是第一步,第二步,第三步,第四步。 谢啦!

  • 我在Spring3.2.1中测试了类似的scenarion。解决此问题的最佳方法是什么? 这个问题是在09年3月10日报告的。 为什么在Spring中仍然没有实现这个功能?是否有任何并发症妨碍解决这一不足?