我正在做Spring批次。我有一个分区步骤(对象列表),然后是读者和作家的从属步骤。
我想以并行模式执行processStep
。因此,我希望每个分区都有一个特定的读写器实例。
目前,创建的分区使用相同的读写器实例。因此,这些操作是在串行模式下完成的:读取和写入第一个分区,然后在第一个分区完成后对下一个分区执行相同的操作。
Spring Boot配置类:
@Configuration
@Import({ DataSourceConfiguration.class})
public class BatchConfiguration {
private final static int COMMIT_INTERVAL = 1;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier(value="mySqlDataSource")
private DataSource mySqlDataSource;
public static int GRID_SIZE = 3;
public static List<Pojo> myList;
@Bean
public Job myJob() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return jobBuilderFactory.get("myJob")
.incrementer(new RunIdIncrementer())
.start(partitioningStep())
.build();
}
@Bean(name="partitionner")
public MyPartitionner partitioner() {
return new MyPartitionner();
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(GRID_SIZE);
return taskExecutor;
}
@Bean
public Step partitioningStep() throws NonTransientResourceException, Exception {
return stepBuilderFactory.get("partitioningStep")
.partitioner("processStep", partitioner())
.step(processStep())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step processStep() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return stepBuilderFactory.get("processStep")
.<List<Pojo>, List<Pojo>> chunk(COMMIT_INTERVAL)
.reader(processReader())
.writer(processWriter())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public ProcessReader processReader() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return new ProcessReader();
}
@Bean
public ProcessWriter processWriter() {
return new ProcessWriter();
}
}
partitionner类
public class MyPartitionner implements Partitioner{
@Autowired
private IService service;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// list of 300 object partitionned like bellow
...
Map<String, ExecutionContext> partitionData = new HashMap<String, ExecutionContext>();
ExecutionContext executionContext0 = new ExecutionContext();
executionContext0.putString("from", Integer.toString(0));
executionContext0.putString("to", Integer.toString(100));
partitionData.put("Partition0", executionContext0);
ExecutionContext executionContext1 = new ExecutionContext();
executionContext1.putString("from", Integer.toString(101));
executionContext1.putString("to", Integer.toString(200));
partitionData.put("Partition1", executionContext1);
ExecutionContext executionContext2 = new ExecutionContext();
executionContext2.putString("from", Integer.toString(201));
executionContext2.putString("to", Integer.toString(299));
partitionData.put("Partition2", executionContext2);
return partitionData;
}
}
读者类
public class ProcessReader implements ItemReader<List<Pojo>>, ChunkListener {
@Autowired
private IService service;
private StepExecution stepExecution;
private static List<String> processedIntervals = new ArrayList<String>();
@Override
public List<Pojo> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
System.out.println("Instance reference: "+this.toString());
if(stepExecution.getExecutionContext().containsKey("from") && stepExecution.getExecutionContext().containsKey("to")){
Integer from = Integer.valueOf(stepExecution.getExecutionContext().get("from").toString());
Integer to = Integer.valueOf(stepExecution.getExecutionContext().get("to").toString());
if(from != null && to != null && !processedIntervals.contains(from + "" + to) && to < BatchConfiguration.myList.size()){
processedIntervals.add(String.valueOf(from + "" + to));
return BatchConfiguration.myList.subList(from, to);
}
}
return null;
}
@Override
public void beforeChunk(ChunkContext context) {
this.stepExecution = context.getStepContext().getStepExecution();
}
@Override
public void afterChunk(ChunkContext context) { }
@Override
public void afterChunkError(ChunkContext context) { }
}
}
作家班
public class ProcessWriter implements ItemWriter<List<Pojo>>{
private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWriter.class);
@Autowired
private IService service;
@Override
public void write(List<? extends List<Pojo>> pojos) throws Exception {
if(!pojos.isEmpty()){
for(Pojo item : pojos.get(0)){
try {
service.remove(item.getId());
} catch (Exception e) {
LOGGER.error("Error occured while removing the item [" + item.getId() + "]", e);
}
}
}
}
}
你能告诉我我的密码有什么问题吗?
通过将@StepScope
添加到我的读写器bean声明中解决:
@Configuration
@Import({ DataSourceConfiguration.class})
public class BatchConfiguration {
...
@Bean
@StepScope
public ProcessReader processReader() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return new ProcessReader();
}
@Bean
@StepScope
public ProcessWriter processWriter() {
return new ProcessWriter();
}
...
}
通过这种方式,您可以看到每个分区都有一个不同的chunck(读写器)实例。
本文向大家介绍在单步模式下运行8085程序,包括了在单步模式下运行8085程序的使用技巧和注意事项,需要的朋友参考一下 在单步模式下运行程序: 单步模式对于查找我们的程序非常有用,该程序在单次执行后未产生期望的结果。我们将其用于程序调试。类似地,就像要单独执行的操作一样,在“>”提示符下键入“ S”。我们可以清楚地注意到,地址<cr>的更改发生在S之后。给定的系统明确提示如下: 起始地址为:xxx
问题内容: 我有用Flask编写的Web应用程序。正如每个人的建议,我不能在生产中使用Flask。所以我想到了带有Flask的Gunicorn。 在Flask应用程序中,我正在加载一些机器学习模型。这些总大小为8GB。我的Web应用程序的并发性可以达到1000个请求。并且机器的RAM是15GB。 那么,运行此应用程序的最佳方法是什么? 问题答案: 你可以使用Gunicorn的多个工作人员或异步工作
前面的详细信息 我需要在交换机上使用ssh来ping不同的主机。早些时候,我为每个主机启动了一个线程,但结果很容易超过了最大ssh连接数,所以我根据这个创建了一个交互式shell会话。但当我并行运行时,它在发出第一个命令后就一直挂在那里。我不知道如何修复此问题。 简化代码如下:
我有下面同样的问题,但我想知道答案。Spring Boot:如何使用多个模式,并在运行时为每个请求动态选择使用哪一个模式 请帮助我找到答案 如何拥有一个数据库连接并为每个请求指定不同的模式? 提前谢谢你。
这个问题类似于如何运行github操作步骤,即使前一步失败,但仍然无法完成作业,但接受的答案对我没有帮助,因为它会创建一个额外的作业。 下面我要做的是 当测试应用程序(步骤2)通过时;测试清理步骤应该运行,github操作工作流返回成功 当测试应用程序(步骤2)失败时;应运行测试清洁、行动松弛和失败行动步骤。github操作工作流返回失败 我如何修复下面的代码来实现它?
我想使用 Confluent 的复制器将数据从一个系统复制到另一个系统。我正在使用两个Ubuntu 18.04系统,其中一个充当源,另一个充当目的地。 我尝试在分布式模式下运行kafka connect replicator,更改了以下配置: < li >在confluent/etc/Kafka/server . properties中,我做了以下更改 源 目的地 然后,我在源系统中创建了主题,并