当前位置: 首页 > 知识库问答 >
问题:

Spring Batch-读取字节流、处理、写入2个不同的csv文件将它们转换为输入流并将其存储到ECS,然后写入数据库

姜明贤
2023-03-14

我有一个要求,我们通过ECS S3 Pre-Signed url以字节流的形式接收csv文件。我必须验证数据并将验证成功和失败的记录写入2个不同的csv文件,并通过将它们转换为InputStream将它们存储到ECS S3存储桶。还将成功记录写入数据库以及入站、成功和失败文件的预签名url。

我是Spring Batch的新手。我应该如何处理这个要求?

如果我选择一个FlatFileItemReader来读取,ItemProcessor来处理数据,我应该如何写入不同的文件和数据库?

我应该使用Tasklet创建作业吗?TIA。

共有1个答案

章高朗
2023-03-14

请查找下面的示例代码段。如果您遇到任何问题,请告诉我

 //Your InputOutPut DTO This is the key object
   Class BaseCSVDTO {
    // yourCSVMappedFields  
    private SuccessCSVObject successObject;
    private FailureCSVObject failureObject;
   }

   //Read the Files in reader as Normal better create a custom reader if you want to get more control
    @Bean
    public ItemReader<BaseCSVDTO> yourFlatFileItemReader() {
        
         //populate mapped fields automatically by Springbatch
    }

    @Bean
    public CSVProcessor csvValidationProcessor() {
        return new CSVProcessor();
    }
    
    Class CSVProcessor implements ItemProcessor<BaseCSVDTO, BaseCSVDTO> {
        @Override
        public BaseCSVDTO CSVProcessor(BaseCSVDTO eachCSVitem) throws Exception {
            //validateEachItem and put in Success or Failure Object
            //Example of Success
                SuccessCSVObject successObject = new SuccessCSVObject()
                eachCSVitem.setSuccessObject(successObject);
            //Same way for Failure object   
        }
    }

   @Bean
    public CompositeItemWriter compositeWriter() throws Exception {
        CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
        List<ItemWriter> writers = new ArrayList<ItemWriter>();
        writers.add(successCSVWriter());
        writers.add(failureCSVWriter());
        compositeItemWriter.setDelegates(writers);
        return compositeItemWriter;
    }

    @Bean
    public YourItemWriter<BaseCSVDTO> successCSVWriter() {
        return new SuccessWriter();
    }

    @Bean
    public YourItemWriter<BaseCSVDTO> failureCSVWriter() {
        return new FailureWriter;
    }

    
    public class SuccessWriter implements ItemWriter<BaseCSVDTO> {
        @Override
        public void write(List<? extends BaseCSVDTO> items){
        for(BaseCSVDTO baseCSVDTO:items) {
            baseCSVDTO.getSuccessObject
          //write Success CSV 
        }
        }
    }

  public class FailureWriter implements ItemWriter<BaseCSVDTO> {
        @Override
        public void write(List<? extends BaseCSVDTO> items){
        for(BaseCSVDTO baseCSVDTO:items) {
          //write Success CSV 
          baseCSVDTO.getFailureObject
        }
        }
    }

    /// Finally Job step
    @Bean
    public Step executionStep() throws Exception {
        return stepBuilderFactory.get("executionStep").<BaseCSVDTO, BaseCSVDTO>chunk(chunkSize)
                .reader(yourFlatFileItemReader()).processor(csvValidationProcessor()).writer(compositeWriter())
                //.faultTolerant()
                //.skipLimit(skipErrorCount).skip(Exception.class)//.noSkip(FileNotFoundException.class)
                //.listener(validationListener())
                //.noRetry(Exception.class)
                //.noRollback(Exception.class)
                .build();
    }
 类似资料:
  • 我想将一个数据帧保存到两个不同的csv文件中(拆分数据帧)-一个文件只包含标题,另一个文件包含其余行。 我想将这两个文件保存在同一个目录下,这样Spark处理所有逻辑将是最好的选择,如果可能的话,而不是使用pandas分割csv文件。 最有效的方法是什么? 谢谢你的帮助!

  • 不使用ByteBuffer:第一种方法 使用byte Buffer:还有一点是数据成员的大小将始终保持固定,即CharacterData=1byte、ShortData=1byte、IntegerData=2byte和StringData=3byte。所以这个类的总大小总是7byte 第二种方法 PS 如果我使用序列化,它还会写入单词“characterdata”、“shortdata”、“int

  • 我正在尝试使用pyspark来分析我在数据砖笔记本上的数据。Blob 存储已装载到数据砖群集上,在分析后,希望将 csv 写回 blob 存储。由于 pyspark 以分布式方式工作,csv 文件被分解为小块并写入 Blob 存储。如何克服这个问题,并在我们使用pyspark进行分析时在blob上编写为单个csv文件。谢谢。

  • 我有几个输出侦听器正在实现。它可以是写到stdout或文件的,也可以是写到内存或任何其他输出目标;因此,我在方法中指定作为(an)参数。 现在,我收到了。在这里向流写入的最佳方式是什么? 我应该只使用吗?我可以给它字节,但如果目标流是字符流,那么它会自动转换吗? 我需要用这里的一些桥流来代替吗?

  • 问题内容: 我有几个正在实现OutputStream的输出侦听器。它可以是写到stdout或文件的PrintStream,也可以写到内存或任何其他输出目标。因此,我在方法中将OutputStream指定为参数。 现在,我已经收到了字符串。在此处写入流的最佳方法是什么? 我应该只使用Writer.write(message.getBytes())吗?我可以给它提供字节,但是如果目标流是字符流,那么它

  • 我在想,在下面描述的场景中使用RXJava是否值得。 一次检索包含一组所需条目的列表,例如特定列的表中查询/选择的结果集。 然后需要在另一个数据源中执行新的查询,以获取for循环中第一个列表中每个元素的相关属性(每个元素一行或多行)。 最后,将检索到的属性集写入csv文件。 例如,可以从数据库中检索公司列表,在本例中,是《财富》杂志中列出的第100个,并将100个对象存储在列表os字符串中。 然后