我读过spring batch中的分区,我发现了一个演示分区的示例。该示例从CSV文件中读取人员,进行一些处理,并将数据插入数据库。在本例中,1 partitioning=1 file,因此partitioner实现如下所示:
public class MultiResourcePartitioner implements Partitioner {
private final Logger logger = LoggerFactory.getLogger(MultiResourcePartitioner.class);
public static final String FILE_PATH = "filePath";
private static final String PARTITION_KEY = "partition";
private final Collection<Resource> resources;
public MultiResourcePartitioner(Collection<Resource> resources) {
this.resources = resources;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
context.putString(FILE_PATH, getPath(resource)); //Depends on what logic you want to use to split
map.put(PARTITION_KEY + i++, context);
}
return map;
}
private String getPath(Resource resource) {
try {
return resource.getFile().getPath();
} catch (IOException e) {
logger.warn("Can't get file from from resource {}", resource);
throw new RuntimeException(e);
}
}
}
但如果我有一个10TB的文件呢?spring批处理是否允许以某种方式对其进行分区?
我尝试了以下方法来实现我的目标:
分为两步——第一步将文件分成若干部分,第二步处理第一步之后得到的部分:
@Configuration
public class SingleFilePartitionedJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private ToLowerCasePersonProcessor toLowerCasePersonProcessor;
@Autowired
private DbPersonWriter dbPersonWriter;
@Autowired
private ResourcePatternResolver resourcePatternResolver;
@Value("${app.file-to-split}")
private Resource resource;
@Bean
public Job splitFileProcessingJob() throws IOException {
return jobBuilderFactory.get("splitFileProcessingJob")
.incrementer(new RunIdIncrementer())
.flow(splitFileIntoPiecesStep())
.next(csvToDbLowercaseMasterStep())
.end()
.build();
}
private Step splitFileIntoPiecesStep() throws IOException {
return stepBuilderFactory.get("splitFile")
.tasklet(new FileSplitterTasklet(resource.getFile()))
.build();
}
@Bean
public Step csvToDbLowercaseMasterStep() throws IOException {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setResources(resourcePatternResolver.getResources("split/*.csv"));
return stepBuilderFactory.get("csvReaderMasterStep")
.partitioner("csvReaderMasterStep", partitioner)
.gridSize(10)
.step(csvToDataBaseSlaveStep())
.taskExecutor(jobTaskExecutorSplitted())
.build();
}
@Bean
public Step csvToDataBaseSlaveStep() throws MalformedURLException {
return stepBuilderFactory.get("csvToDatabaseStep")
.<Person, Person>chunk(50)
.reader(csvPersonReaderSplitted(null))
.processor(toLowerCasePersonProcessor)
.writer(dbPersonWriter)
.build();
}
@Bean
@StepScope
public FlatFileItemReader csvPersonReaderSplitted(@Value("#{stepExecutionContext[fileName]}") String fileName) throws MalformedURLException {
return new FlatFileItemReaderBuilder()
.name("csvPersonReaderSplitted")
.resource(new UrlResource(fileName))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
@Bean
public TaskExecutor jobTaskExecutorSplitted() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.setThreadNamePrefix("cust-job-exec2-");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
微线程:
public class FileSplitterTasklet implements Tasklet {
private final Logger logger = LoggerFactory.getLogger(FileSplitterTasklet.class);
private File file;
public FileSplitterTasklet(File file) {
this.file = file;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
int count = FileSplitter.splitTextFiles(file, 100);
logger.info("File was split on {} files", count);
return RepeatStatus.FINISHED;
}
}
分割文件的逻辑:
public static int splitTextFiles(File bigFile, int maxRows) throws IOException {
int fileCount = 1;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(bigFile.getPath()))) {
String line = null;
int lineNum = 1;
Path splitFile = Paths.get(bigFile.getParent() + "/" + fileCount + "split.txt");
BufferedWriter writer = Files.newBufferedWriter(splitFile, StandardOpenOption.CREATE);
while ((line = reader.readLine()) != null) {
if (lineNum > maxRows) {
writer.close();
lineNum = 1;
fileCount++;
splitFile = Paths.get("split/" + fileCount + "split.txt");
writer = Files.newBufferedWriter(splitFile, StandardOpenOption.CREATE);
}
writer.append(line);
writer.newLine();
lineNum++;
}
writer.close();
}
return fileCount;
}
所以我把所有的文件都放进了特别的目录。
但这不起作用,因为在上下文初始化时,文件夹/split
还不存在。
我已经生成了一个可行的解决方案:
public class MultiResourcePartitionerWrapper implements Partitioner {
private final MultiResourcePartitioner multiResourcePartitioner = new MultiResourcePartitioner();
private final ResourcePatternResolver resourcePatternResolver;
private final String pathPattern;
public MultiResourcePartitionerWrapper(ResourcePatternResolver resourcePatternResolver, String pathPattern) {
this.resourcePatternResolver = resourcePatternResolver;
this.pathPattern = pathPattern;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
try {
Resource[] resources = resourcePatternResolver.getResources(pathPattern);
multiResourcePartitioner.setResources(resources);
return multiResourcePartitioner.partition(gridSize);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
但它看起来很难看。这是正确的解决方案吗?
Spring batch允许您进行分区,但如何进行分区取决于您。
您可以简单地在分区器类中拆分您的10TB文件(按数字或按最大行),每个拆分读取一个拆分的文件。您可以在java中找到很多如何拆分大文件的示例。按最大行拆分非常大的文本文件
嗨,我是新春批。 我有如下Spring批次的情况: 我需要运行所有促销的批处理[促销列表] > 在这里,我想再次从batch中读取上面的动态查询,因为它返回的结果至少为5万条记录。 以下是我所期待的过程,这在Spring批次中是否可行? 阅读促销【读者逐一阅读促销】 创建查询并将其放在上下文中 传递给下一个读者 读取器逐个读取事务 处理交易并计算积分 我这里的问题是不能写嵌套块[一个用于读取提升,
需要读取spring批处理中的文件,对其进行处理并将其作为一个提要保存。一个提要包含50%的信息。当我必须持久化提要的最终结果时,我需要使用公共字段将它们组合起来,并像一个项目一样持久化。请参见下面的示例。 我需要保留的最终信息如下: 请建议我如何在我的Spring批工作中实现这一点。 谢谢
但它不止一次地使用消息。有没有人面对过这个问题。此外,使用上述配置,使用者总是在一个批处理中只接收到一个消息。我尝试增加和,但没有任何影响。 在对ConcurrentKafkaListenerContainerFactory进行如下更改后,批处理配置的问题得到了解决: factory.getContainerProperties().SetackMode(org.springFramework.k
一段时间以来,我一直在寻找解决spring batch问题的方法。我应该使用spring batch从csv文件复制/创建新的csv文件。这里有一个例子: 下面是一个创建输出编号1的示例,例如File1: 其他输出文件也是一样的,但是你可以看到一些输出使用相同的源,事实上,我不能读取相同的数据两次来重新生成一个新的输出,所以我尝试将它们作为资源存储在地图上(资源将被使用不止一个),也就是说,我将
我有一个包含多个json文件的zip文件。我已解压缩它们,然后使用以下代码从json获取POJO对象: 但我需要使用spring批处理逐个处理这些json文件。有人能帮助我如何在spring批处理中实现这一点吗?我想使用1000块来读取json文件。我的json对象非常复杂。例子:
我正在处理许多没有固定头/列的CSV文件,说我可以得到file1。包含10列和文件2的csv。csv包含50列。 我无法提前知道我将拥有的列数,我无法为每种文件类型创建特定的作业,我的输入将是一个黑框:一堆CSV,它将具有从10到无限的X列数。 当我想使用Spring Batch自动导入这些CSV时,我想知道这是否可能?我知道我必须获得固定数量的列,因为处理器以及我需要将数据序列化为POJO然后再