目前,我们正在将批处理作业从java迁移到spring batch。此批处理作业从数据库和web服务获取其输入。我们需要在四台服务器上运行此作业,以提高性能,因为此作业正在处理大量数据。
上述场景是否可以通过spring batch中的远程分区实现?
我浏览了远程分区文档,但它很难理解,我没有找到任何关于远程分区的可靠示例。
请帮帮我。
我想分享远程分区示例。您可以在此处找到所有来源
批处理配置:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@Import(value = {BrokerConfiguration.class})
public class MasterConfiguration {
private static final int GRID_SIZE = 3;
private final JobBuilderFactory jobBuilderFactory;
private final RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;
public MasterConfiguration(JobBuilderFactory jobBuilderFactory,
RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.masterStepBuilderFactory = masterStepBuilderFactory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the master step
*/
@Bean
public Step masterStep() {
return this.masterStepBuilderFactory.get("masterStep")
.partitioner("workerStep", new BasicPartitioner())
.gridSize(GRID_SIZE)
.outputChannel(requests())
.inputChannel(replies())
.build();
}
@Bean
public Job remotePartitioningJob() {
return this.jobBuilderFactory.get("remotePartitioningJobMy")
.incrementer(new RunIdIncrementer())
.start(masterStep())
.build();
}
}
分割者:
public class BasicPartitioner extends SimplePartitioner {
private static final String PARTITION_KEY = "partition";
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = super.partition(gridSize);
int i = 0;
for (ExecutionContext context : partitions.values()) {
context.put(PARTITION_KEY, PARTITION_KEY + (i++));
}
return partitions;
}
}
代理配置:
@Configuration
@PropertySource("classpath:remote-partitioning.properties")
public class BrokerConfiguration {
@Value("${broker.url}")
private String brokerUrl;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(this.brokerUrl);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}
}
开胃菜:
@EnableBatchProcessing
@SpringBootApplication
@Import({BasicPartitioner.class, BrokerConfiguration.class})
public class MasterApplication {
@Value("${broker.url}")
private String brokerUrl;
public static void main(String[] args) {
SpringApplication.run(MasterApplication.class, args);
}
@PostConstruct
public void init() throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector(brokerUrl);
broker.start();
}
}
配置:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@Import(value = {BrokerConfiguration.class})
public class WorkerConfiguration {
private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
public WorkerConfiguration(RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
this.workerStepBuilderFactory = workerStepBuilderFactory;
}
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the master)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the worker step
*/
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.outputChannel(replies())
.tasklet(tasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
return (contribution, chunkContext) -> {
System.out.println("processing " + partition);
return RepeatStatus.FINISHED;
};
}
}
开胃菜:
@EnableBatchProcessing
@SpringBootApplication
@Import({BrokerConfiguration.class})
public class WorkerApplication {
public static void main(String[] args) {
SpringApplication.run(WorkerApplication.class, args);
}
}
是的,您所描述的内容将用于远程分区。我就Spring Batch中的可伸缩性选项做了一个演讲,您可以在这里查看:https://www.youtube.com/watch?v=CYTj5YT7CZU
运行它的代码示例可以在这里找到:https://github.com/mminella/Spring-Batch-Talk-2.0
主要内容:面向读者,前提条件,问题反馈Spring Batch是一个轻量级框架,用于在开发企业应用程序中批处理应用程序。 本教程解释了Spring Batch的基本概念,并展示了如何在实际环境中使用它。 面向读者 本教程对于那些需要处理大量涉及诸如事务管理,作业处理统计,资源管理等重复操作的记录的专业人员来说尤其有用。Spring Batch是处理大容量的非常有效的框架 批量作业。 前提条件 Spring Batch建立在Spring
远程引用是对远程仓库的引用(指针),包括分支、标签等等。 你可以通过 git ls-remote (remote) 来显式地获得远程引用的完整列表,或者通过 git remote show (remote) 获得远程分支的更多信息。 然而,一个更常见的做法是利用远程跟踪分支 {#2}。 远程跟踪分支 {#2}是远程分支状态的引用。 它们是你不能移动的本地引用,当你做任何网络通信操作时,它们会自动移
试着看看我是否能设计一个既需要分区又需要远程分块的工作。我们可以有类似于表A的东西来保存行(表A中的一列将是分区键),对于表A中的每一行,我们将有表B,其中包含表A中给定外部/分区键的许多子记录。我们需要运行一个查询,根据查询过滤表a中的分区键,并为每个分区键处理表B中的所有子记录(这里我们在表B中也会有数百万条记录,因此我们需要并行处理记录,从而实现远程分块) 对于这样的事情,什么是正确的思考s
我想用3个步骤建立一个批次。我想配置这个步骤,就像如果有100条记录,当step1读取、处理和写入一个10块时,step02,然后step03开始和结束,然后再次返回step1,读取下一个块。这在Spring批量可能吗?
问题内容: Spring Batch远程分块和远程分区有什么区别? 我无法理解Spring Batch中的远程分块和远程分区之间的区别。有人可以解释一下吗? 问题答案: 远程分区 分区是主/从步骤配置,允许并行处理数据分区。每个分区都是通过一些元数据来描述的。例如,如果您正在处理数据库表,则分区1可能是ID 0-100,分区2可能是101-200,等等。对于Spring Batch,主步骤使用分区
spring批处理远程分块和远程分区之间有什么区别? 我无法理解spring batch中远程分块和远程分区之间的区别。谁能解释一下吗?