根据我的研究,我知道Spring Batch提供API来处理许多不同类型的数据文件格式。
但我需要澄清我们如何在一个块/Tasklet中提供多个不同格式的文件。
为此,我知道有MultiResourceItemReader可以处理多个文件,但AFAIK所有文件都必须具有相同的格式和数据结构。
所以,问题是我们如何在一个Tasklet中提供多个不同数据格式的文件作为输入?
我不认为有现成的Spring批处理读取器用于多输入格式。
你必须自己建造。当然,您可以将现有的FileItemReader重新用作自定义文件读取器中的代理,对于每种文件类型/格式,请使用正确的文件类型/格式。
Asoub是对的,没有现成的Spring批处理读取器可以“全部读取!”。然而,只需几个相当简单和直接的类,就可以创建一个java config spring批处理应用程序,该应用程序将处理不同文件格式的不同文件。
对于我的一个应用程序,我有一个类似类型的用例,我编写了一堆相当简单和直截了当的Spring Batch框架实现和扩展,以创建我所说的“通用”阅读器。所以回答你的问题:下面你会找到我用来使用Spring批处理不同类型文件格式的代码。显然,在下面你会找到剥离的实现,但它应该会让你朝着正确的方向前进。
一行由记录表示:
public class Record {
private Object[] columns;
public void setColumnByIndex(Object candidate, int index) {
columns[index] = candidate;
}
public Object getColumnByIndex(int index){
return columns[index];
}
public void setColumns(Object[] columns) {
this.columns = columns;
}
}
每行包含多个列,这些列由分隔符分隔。无论file1是否包含10列和/或file2是否仅包含3列,这都无关紧要。
以下阅读器简单地将每一行映射到一条记录:
@Component
public class GenericReader {
@Autowired
private GenericLineMapper genericLineMapper;
@SuppressWarnings({ "unchecked", "rawtypes" })
public FlatFileItemReader reader(File file) {
FlatFileItemReader<Record> reader = new FlatFileItemReader();
reader.setResource(new FileSystemResource(file));
reader.setLineMapper((LineMapper) genericLineMapper.defaultLineMapper());
return reader;
}
}
映射器取一行并将其转换为对象数组:
@Component
public class GenericLineMapper {
@Autowired
private ApplicationConfiguration applicationConfiguration;
@SuppressWarnings({ "unchecked", "rawtypes" })
public DefaultLineMapper defaultLineMapper() {
DefaultLineMapper lineMapper = new DefaultLineMapper();
lineMapper.setLineTokenizer(tokenizer());
lineMapper.setFieldSetMapper(new CustomFieldSetMapper());
return lineMapper;
}
private DelimitedLineTokenizer tokenizer() {
DelimitedLineTokenizer tokenize = new DelimitedLineTokenizer();
tokenize.setDelimiter(Character.toString(applicationConfiguration.getDelimiter()));
tokenize.setQuoteCharacter(applicationConfiguration.getQuote());
return tokenize;
}
}
将列转换为记录的“魔法”发生在FieldSetMapper中:
@Component
public class CustomFieldSetMapper implements FieldSetMapper<Record> {
@Override
public Record mapFieldSet(FieldSet fieldSet) throws BindException {
Record record = new Record();
Object[] row = new Object[fieldSet.getValues().length];
for (int i = 0; i < fieldSet.getValues().length; i++) {
row[i] = fieldSet.getValues()[i];
}
record.setColumns(row);
return record;
}
}
使用yaml配置,用户可以提供一个输入目录和一个文件名列表,当然,如果列包含分隔符,则可以使用适当的分隔符和字符引用列。以下是此类yaml配置的示例:
@Component
@ConfigurationProperties
public class ApplicationConfiguration {
private String inputDir;
private List<String> fileNames;
private char delimiter;
private char quote;
// getters and setters ommitted
}
然后是应用程序。yml:
input-dir: src/main/resources/
file-names: [yourfile1.csv, yourfile2.csv, yourfile3.csv]
delimiter: "|"
quote: "\""
最后但并非最不重要的一点是,将所有这些放在一起:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private GenericReader genericReader;
@Autowired
private NoOpWriter noOpWriter;
@Autowired
private ApplicationConfiguration applicationConfiguration;
@Bean
public Job yourJobName() {
List<Step> steps = new ArrayList<>();
applicationConfiguration.getFileNames().forEach(f -> steps.add(loadStep(new File(applicationConfiguration.getInputDir() + f))));
return jobBuilderFactory.get("yourjobName")
.start(createParallelFlow(steps))
.end()
.build();
}
@SuppressWarnings("unchecked")
public Step loadStep(File file) {
return stepBuilderFactory.get("step-" + file.getName())
.<Record, Record> chunk(10)
.reader(genericReader.reader(file))
.writer(noOpWriter)
.build();
}
private Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
// max multithreading = -1, no multithreading = 1, smart size = steps.size()
taskExecutor.setConcurrencyLimit(1);
List<Flow> flows = steps.stream()
.map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()]))
.build();
}
}
出于演示目的,您可以将所有类放在一个包中。NoOpWriter只记录我的测试文件的第二列。
@Component
public class NoOpWriter implements ItemWriter<Record> {
@Override
public void write(List<? extends Record> items) throws Exception {
items.forEach(i -> System.out.println(i.getColumnByIndex(1)));
// NO - OP
}
}
祝你好运:-)
问题内容: 好的,这是我的难题,我建立了一个数据库,其中包含约5个表,所有表的数据结构完全相同。出于本地化的目的,以这种方式分离了数据,并总共分割了约450万条记录。 在大多数情况下,只需要一张桌子就可以了。但是,有时需要两个或多个表中的数据,并且需要按用户定义的列对数据进行排序。这就是我遇到的问题。 数据列: MySQL陈述: MySQL吐出这个错误: 显然,我做错了。有人愿意为我阐明一下吗?
让Person是一个具有属性name、age和idNumber的类。我希望有一个“人”的集合,我希望能够以最有效的方式执行以下操作: 通过他们的ID号检索他们。 获取年龄的人的列表 我的想法是同时维护一个使用id作为键的Hashmap和两个使用age和name作为每个树映射键的树映射。 这是最好的方式吗
我有两个文件命名为文章和类别。我使用SEO URL结构生成了一个URL格式。样品: 实例com/文章标题 实例com/类别标题 代码: 但有一个问题。我不能同时使用两种url格式。htaccess文件。他看到第一行,但忽略了另一行。但是我想对这两个文件使用相同的格式。你能帮忙吗?
我已经完成了连接Clickhouse服务器/客户端和创建表的任务。然后,我想将数据从csv导入到该表中。问题是ClickHouse中的DateTime类型需要这样的格式:YYYY-MM-DD hh:mm:ss,但是我下载的数据集只有这个时间格式:2016-01-13 6:15:00am(YYYY-MM-DD h:MM:ss)小时在我的数据集中只有h,应该是hh。请告诉我如何将csv文件中的所有数据
我在Spring有一个项目,我必须读两本。txt文件,一个有许多行,另一个是控制文件,其中包含应从第一个文件读取的行数。我知道我必须使用分区来处理这些文件,因为第一个文件非常大,我需要对其进行分区,并能够在失败时重新启动,但我不知道读者应该如何处理这些文件,因为两个文件的行宽度不同。没有一个文件的行中有标题或分隔符,因此我必须根据主要在第一行中的范围来获取字段<我的一个疑问是,我是否应该在同一个读
我正在尝试使用hadoop map reduce,但不是在映射器中一次映射每一行,而是一次映射整个文件。