我有一个要求,我们通过ECS S3 Pre-Signed url以字节流的形式接收csv文件。我必须验证数据并将验证成功和失败的记录写入2个不同的csv文件,并通过将它们转换为InputStream将它们存储到ECS S3存储桶。还将成功记录写入数据库以及入站、成功和失败文件的预签名url。
我是Spring Batch的新手。我应该如何处理这个要求?
如果我选择一个FlatFileItemReader来读取,ItemProcessor来处理数据,我应该如何写入不同的文件和数据库?
或
我应该使用Tasklet创建作业吗?TIA。
请查找下面的示例代码段。如果您遇到任何问题,请告诉我
//Your InputOutPut DTO This is the key object
Class BaseCSVDTO {
// yourCSVMappedFields
private SuccessCSVObject successObject;
private FailureCSVObject failureObject;
}
//Read the Files in reader as Normal better create a custom reader if you want to get more control
@Bean
public ItemReader<BaseCSVDTO> yourFlatFileItemReader() {
//populate mapped fields automatically by Springbatch
}
@Bean
public CSVProcessor csvValidationProcessor() {
return new CSVProcessor();
}
Class CSVProcessor implements ItemProcessor<BaseCSVDTO, BaseCSVDTO> {
@Override
public BaseCSVDTO CSVProcessor(BaseCSVDTO eachCSVitem) throws Exception {
//validateEachItem and put in Success or Failure Object
//Example of Success
SuccessCSVObject successObject = new SuccessCSVObject()
eachCSVitem.setSuccessObject(successObject);
//Same way for Failure object
}
}
@Bean
public CompositeItemWriter compositeWriter() throws Exception {
CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
List<ItemWriter> writers = new ArrayList<ItemWriter>();
writers.add(successCSVWriter());
writers.add(failureCSVWriter());
compositeItemWriter.setDelegates(writers);
return compositeItemWriter;
}
@Bean
public YourItemWriter<BaseCSVDTO> successCSVWriter() {
return new SuccessWriter();
}
@Bean
public YourItemWriter<BaseCSVDTO> failureCSVWriter() {
return new FailureWriter;
}
public class SuccessWriter implements ItemWriter<BaseCSVDTO> {
@Override
public void write(List<? extends BaseCSVDTO> items){
for(BaseCSVDTO baseCSVDTO:items) {
baseCSVDTO.getSuccessObject
//write Success CSV
}
}
}
public class FailureWriter implements ItemWriter<BaseCSVDTO> {
@Override
public void write(List<? extends BaseCSVDTO> items){
for(BaseCSVDTO baseCSVDTO:items) {
//write Success CSV
baseCSVDTO.getFailureObject
}
}
}
/// Finally Job step
@Bean
public Step executionStep() throws Exception {
return stepBuilderFactory.get("executionStep").<BaseCSVDTO, BaseCSVDTO>chunk(chunkSize)
.reader(yourFlatFileItemReader()).processor(csvValidationProcessor()).writer(compositeWriter())
//.faultTolerant()
//.skipLimit(skipErrorCount).skip(Exception.class)//.noSkip(FileNotFoundException.class)
//.listener(validationListener())
//.noRetry(Exception.class)
//.noRollback(Exception.class)
.build();
}
我想将一个数据帧保存到两个不同的csv文件中(拆分数据帧)-一个文件只包含标题,另一个文件包含其余行。 我想将这两个文件保存在同一个目录下,这样Spark处理所有逻辑将是最好的选择,如果可能的话,而不是使用pandas分割csv文件。 最有效的方法是什么? 谢谢你的帮助!
不使用ByteBuffer:第一种方法 使用byte Buffer:还有一点是数据成员的大小将始终保持固定,即CharacterData=1byte、ShortData=1byte、IntegerData=2byte和StringData=3byte。所以这个类的总大小总是7byte 第二种方法 PS 如果我使用序列化,它还会写入单词“characterdata”、“shortdata”、“int
我正在尝试使用pyspark来分析我在数据砖笔记本上的数据。Blob 存储已装载到数据砖群集上,在分析后,希望将 csv 写回 blob 存储。由于 pyspark 以分布式方式工作,csv 文件被分解为小块并写入 Blob 存储。如何克服这个问题,并在我们使用pyspark进行分析时在blob上编写为单个csv文件。谢谢。
我有几个输出侦听器正在实现。它可以是写到stdout或文件的,也可以是写到内存或任何其他输出目标;因此,我在方法中指定作为(an)参数。 现在,我收到了。在这里向流写入的最佳方式是什么? 我应该只使用吗?我可以给它字节,但如果目标流是字符流,那么它会自动转换吗? 我需要用这里的一些桥流来代替吗?
问题内容: 我有几个正在实现OutputStream的输出侦听器。它可以是写到stdout或文件的PrintStream,也可以写到内存或任何其他输出目标。因此,我在方法中将OutputStream指定为参数。 现在,我已经收到了字符串。在此处写入流的最佳方法是什么? 我应该只使用Writer.write(message.getBytes())吗?我可以给它提供字节,但是如果目标流是字符流,那么它
我在想,在下面描述的场景中使用RXJava是否值得。 一次检索包含一组所需条目的列表,例如特定列的表中查询/选择的结果集。 然后需要在另一个数据源中执行新的查询,以获取for循环中第一个列表中每个元素的相关属性(每个元素一行或多行)。 最后,将检索到的属性集写入csv文件。 例如,可以从数据库中检索公司列表,在本例中,是《财富》杂志中列出的第100个,并将100个对象存储在列表os字符串中。 然后