当前位置: 首页 > 知识库问答 >
问题:

在Spring Batch中输入不同数据结构格式的多个文件

禄光霁
2023-03-14

根据我的研究,我知道Spring Batch提供API来处理许多不同类型的数据文件格式。

但我需要澄清我们如何在一个块/Tasklet中提供多个不同格式的文件。

为此,我知道有MultiResourceItemReader可以处理多个文件,但AFAIK所有文件都必须具有相同的格式和数据结构。

所以,问题是我们如何在一个Tasklet中提供多个不同数据格式的文件作为输入?

共有2个答案

郎飞航
2023-03-14

我不认为有现成的Spring批处理读取器用于多输入格式。

你必须自己建造。当然,您可以将现有的FileItemReader重新用作自定义文件读取器中的代理,对于每种文件类型/格式,请使用正确的文件类型/格式。

岳池暝
2023-03-14

Asoub是对的,没有现成的Spring批处理读取器可以“全部读取!”。然而,只需几个相当简单和直接的类,就可以创建一个java config spring批处理应用程序,该应用程序将处理不同文件格式的不同文件。

对于我的一个应用程序,我有一个类似类型的用例,我编写了一堆相当简单和直截了当的Spring Batch框架实现和扩展,以创建我所说的“通用”阅读器。所以回答你的问题:下面你会找到我用来使用Spring批处理不同类型文件格式的代码。显然,在下面你会找到剥离的实现,但它应该会让你朝着正确的方向前进。

一行由记录表示:

public class Record {

    private Object[] columns;

    public void setColumnByIndex(Object candidate, int index) {
        columns[index] = candidate;
    }

    public Object getColumnByIndex(int index){
        return columns[index];
    }

    public void setColumns(Object[] columns) {
        this.columns = columns;
    }
}

每行包含多个列,这些列由分隔符分隔。无论file1是否包含10列和/或file2是否仅包含3列,这都无关紧要。

以下阅读器简单地将每一行映射到一条记录:

@Component
public class GenericReader {

    @Autowired
    private GenericLineMapper genericLineMapper;

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public FlatFileItemReader reader(File file) {
        FlatFileItemReader<Record> reader = new FlatFileItemReader();
        reader.setResource(new FileSystemResource(file));
        reader.setLineMapper((LineMapper) genericLineMapper.defaultLineMapper());
        return reader;
    }
}

映射器取一行并将其转换为对象数组:

@Component
public class GenericLineMapper {

    @Autowired
    private ApplicationConfiguration applicationConfiguration;

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public DefaultLineMapper defaultLineMapper() {
        DefaultLineMapper lineMapper = new DefaultLineMapper();
        lineMapper.setLineTokenizer(tokenizer());       
        lineMapper.setFieldSetMapper(new CustomFieldSetMapper());
        return lineMapper;
    }

    private DelimitedLineTokenizer tokenizer() {
        DelimitedLineTokenizer tokenize = new DelimitedLineTokenizer();     
        tokenize.setDelimiter(Character.toString(applicationConfiguration.getDelimiter()));
        tokenize.setQuoteCharacter(applicationConfiguration.getQuote());
        return tokenize;
    }
}

将列转换为记录的“魔法”发生在FieldSetMapper中:

@Component
public class CustomFieldSetMapper implements FieldSetMapper<Record> {

    @Override
    public Record mapFieldSet(FieldSet fieldSet) throws BindException {
        Record record = new Record();
        Object[] row = new Object[fieldSet.getValues().length];
        for (int i = 0; i < fieldSet.getValues().length; i++) {
            row[i] = fieldSet.getValues()[i];
        }
        record.setColumns(row);
        return record;
    }
}

使用yaml配置,用户可以提供一个输入目录和一个文件名列表,当然,如果列包含分隔符,则可以使用适当的分隔符和字符引用列。以下是此类yaml配置的示例:

@Component
@ConfigurationProperties
public class ApplicationConfiguration {

    private String inputDir;
    private List<String> fileNames;
    private char delimiter;
    private char quote;

    // getters and setters ommitted
}

然后是应用程序。yml:

input-dir: src/main/resources/
file-names: [yourfile1.csv, yourfile2.csv, yourfile3.csv]
delimiter: "|"
quote: "\""

最后但并非最不重要的一点是,将所有这些放在一起:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private GenericReader genericReader;
    @Autowired
    private NoOpWriter noOpWriter;
    @Autowired
    private ApplicationConfiguration applicationConfiguration;

    @Bean
    public Job yourJobName() {
        List<Step> steps = new ArrayList<>();
        applicationConfiguration.getFileNames().forEach(f -> steps.add(loadStep(new File(applicationConfiguration.getInputDir() + f))));

        return jobBuilderFactory.get("yourjobName")                
                .start(createParallelFlow(steps))
                .end()
                .build();
    }

    @SuppressWarnings("unchecked")
    public Step loadStep(File file) {
        return stepBuilderFactory.get("step-" + file.getName())
                .<Record, Record> chunk(10)
                .reader(genericReader.reader(file))
                .writer(noOpWriter)
                .build();
    } 

    private Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        // max multithreading = -1, no multithreading = 1, smart size = steps.size()
        taskExecutor.setConcurrencyLimit(1); 

        List<Flow> flows = steps.stream()
                .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
                .split(taskExecutor)
                .add(flows.toArray(new Flow[flows.size()]))
                .build();
    }
}

出于演示目的,您可以将所有类放在一个包中。NoOpWriter只记录我的测试文件的第二列。

@Component
public class NoOpWriter implements ItemWriter<Record> {

    @Override
    public void write(List<? extends Record> items) throws Exception {
        items.forEach(i -> System.out.println(i.getColumnByIndex(1)));      
        // NO - OP
    }
}

祝你好运:-)

 类似资料:
  • 问题内容: 好的,这是我的难题,我建立了一个数据库,其中包含约5个表,所有表的数据结构完全相同。出于本地化的目的,以这种方式分离了数据,并总共分割了约450万条记录。 在大多数情况下,只需要一张桌子就可以了。但是,有时需要两个或多个表中的数据,并且需要按用户定义的列对数据进行排序。这就是我遇到的问题。 数据列: MySQL陈述: MySQL吐出这个错误: 显然,我做错了。有人愿意为我阐明一下吗?

  • 让Person是一个具有属性name、age和idNumber的类。我希望有一个“人”的集合,我希望能够以最有效的方式执行以下操作: 通过他们的ID号检索他们。 获取年龄的人的列表 我的想法是同时维护一个使用id作为键的Hashmap和两个使用age和name作为每个树映射键的树映射。 这是最好的方式吗

  • 我有两个文件命名为文章和类别。我使用SEO URL结构生成了一个URL格式。样品: 实例com/文章标题 实例com/类别标题 代码: 但有一个问题。我不能同时使用两种url格式。htaccess文件。他看到第一行,但忽略了另一行。但是我想对这两个文件使用相同的格式。你能帮忙吗?

  • 我已经完成了连接Clickhouse服务器/客户端和创建表的任务。然后,我想将数据从csv导入到该表中。问题是ClickHouse中的DateTime类型需要这样的格式:YYYY-MM-DD hh:mm:ss,但是我下载的数据集只有这个时间格式:2016-01-13 6:15:00am(YYYY-MM-DD h:MM:ss)小时在我的数据集中只有h,应该是hh。请告诉我如何将csv文件中的所有数据

  • 我在Spring有一个项目,我必须读两本。txt文件,一个有许多行,另一个是控制文件,其中包含应从第一个文件读取的行数。我知道我必须使用分区来处理这些文件,因为第一个文件非常大,我需要对其进行分区,并能够在失败时重新启动,但我不知道读者应该如何处理这些文件,因为两个文件的行宽度不同。没有一个文件的行中有标题或分隔符,因此我必须根据主要在第一行中的范围来获取字段<我的一个疑问是,我是否应该在同一个读

  • 我正在尝试使用hadoop map reduce,但不是在映射器中一次映射每一行,而是一次映射整个文件。