@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
public JobBuilderFactory jobBuilderFactory;
public StepBuilderFactory stepBuilderFactory;
//these are the properties i need to dynamically change
private String[] names;
private Resource inputResource;
private String[] positions;
private String tableName;
private String columnNames;
private String values;
@Autowired
public void setJobBuilderFactory(JobBuilderFactory jobBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
}
@Autowired
public void setStepBuilderFactory(StepBuilderFactory stepBuilderFactory) {
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
BatchConfigurer configurer(@Qualifier("prestagingJpaDataSource") DataSource dataSource){
return new DefaultBatchConfigurer(dataSource);
}
@Bean
public FlatFileItemReader<String[]> reader() {
return new FlatFileItemReaderBuilder<String[]>()
.name("hacReader")
//put file here
.resource(inputResource)
.lineMapper(new DefaultLineMapper<String[]>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
//put column names here
setNames(names);
}});
//put column positions here
setFieldSetMapper(fieldSet -> positions);
}})
.build();
}
//this is obviously wrong, as i would need to use an ItemPreparedStatementSetter
@Bean
public JdbcBatchItemWriter<String[]> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<String[]>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO " + tableName + " (" + columnsNames + ") VALUES (" + values)
.dataSource(dataSource)
.build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<String[]> writer) {
return stepBuilderFactory.get("step1")
.<String[], String[]> chunk(10)
.reader(reader())
//.processor(processor())
.writer(writer)
.build();
}
}
如果是,那么我需要做什么来让它们工作?
您可以通过将这些属性作为作业参数传递(在本例中,这些属性最好作为非标识性作业参数传递),并在运行时将它们在读取器中进行后期绑定。下面是一个例子:
@Bean
@StepScope
public ItemReader<String[]> itemReader(
@Value("#{jobParameters['fileName']}") String fileName,
@Value("#{jobParameters['columnNames']}") String columnNames
) {
return new FlatFileItemReaderBuilder<String[]>()
.name("hacReader")
//put file here
.resource(new FileSystemResource(fileName))
.lineMapper(new DefaultLineMapper<String[]>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
//put column names here
setNames(columnNames.split(","));
}});
}})
.build();
}
通过此设置,读取器将被动态配置为文件名
和columnnames
指定为作业参数:
JobParameters jobParameters = new JobParametersBuilder()
.addString("fileName", "/path/to/input/file")
.addString("columnNames", "column1,column2,column5")
.toJobParameters();
希望这能有所帮助。
是否可以在Spring批处理中动态配置作业? 这是我想做的。我创建了几个不同的,如下所示: FlatFileItemReader 我希望能够在创建批处理作业时动态混合和匹配它们。例如,假设我需要一个有2个步骤的作业。第一步包含一个用于预处理的。第二步将有一个,用于使用我的阅读器/写入器进行基于块的数据处理......类似这样的东西: 在XML中,我可以执行以下操作: 但是我如何像上面一样以编程方式
当编写器抛出异常时,我希望能够将步骤和作业状态设置为失败。在做了一些调试和检查Spring批处理源代码后,我注意到配置了一个,它认为是一个致命的异常,因此将作业状态设置为FAILED,所以我将代码包装在我的编写器中的一个try-get中,将包装在中,现在作业和步骤状态设置为FAILED,这是我想要的。我不确定这是否是正确的方法,因为我在任何地方都找不到它的文档,的留档也没有提到它。所以,问题是:这
在我的例子中,我使用Spring boot和MySQL DB。我有两个数据库用户。 管理员用户由liquibase用来创建表。它有自己的数据源 现在我正在尝试将Spring批处理添加到服务中。我得到了一个重复的bean异常。当我将@Primary添加到liquibase datasource时,它得到了修复,但我们不想将其用于批处理的读/写操作。 如果我在Dev User数据源中添加@Prime,
我们正在处理一个Spring批处理项目(Spring Boot1.2.2.Release),要求使用Spring SFTP集成以一定频率轮询从服务器位置读取文件。我们使用java config实现了Spring批处理,并在使用Spring Integration java config的过程中实现了Spring批处理。我找不到描述上述情况的例子。我浏览了各种链接,但看到的主要是XML配置示例。 h
我有下一个spring批处理配置类: 启动应用程序时,我收到下一个异常:
这是我第一次使用Spring批处理,我有一个例外: 我不知道如何解决这个错误。我使用Java配置来定义作业、步骤、读取器、处理器和写入器: null