当前位置: 首页 > 知识库问答 >
问题:

Spring批处理集成远程分区-运行并行作业

公西博实
2023-03-14

我们有一个用例,需要从一些分页的API读取数据,然后写入一些下游的Kafka主题。

我们已经能够通过Spring批处理集成远程分区来实现解决方案,其中管理器通过创建包含页码和偏移量以读取数据的执行上下文来处理任务的分区。管理器创建此执行上下文并将它们放在MessagingChannel上(我可以使用RabbitMQ和Kafka主题,以提供解决方案者为准)。工作人员(超过1个)从MessagingChannel中选择该执行上下文并完成从API读取数据并将其写入所需的Kafka主题的任务。

上面的实现工作得很好。如果我一个接一个地为不同的客户运行同一个作业,这也很好。当我们想要为多个客户端并行运行同一个作业时,挑战就来了。例如,我们并行地为两个客户机启动作业。它为每个客户创建一名经理和两名员工。现在问题来了,当两位经理在同一个消息通道上推送executionContext时,工人们不知道选择并执行哪一个。而且,这两个作业共享相同的数据库spring批处理表,因此我怀疑这也会在该级别上产生问题。

关于如何实现并行运行多个spring batch reporter分区作业的任何输入或参考。

更新[2022年1月18日]

我尝试在这里和下面将@StepScoped添加到MessageChannel分区处理程序是我得到的错误:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'partitioningMessageHandler': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:101) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1821) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getObjectForBeanInstance(AbstractAutowireCapableBeanFactory.java:1266) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.resolveTargetBeanFromMethodWithBeanAnnotation(AbstractMethodAnnotationPostProcessor.java:536) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:154) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:912) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at spring.batch.integration.Manager.main(Manager.java:11) ~[main/:na]
Caused by: java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
    at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.validateFallbackMethods(MessagingMethodInvokerHelper.java:751) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:740) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:294) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.<init>(MethodInvokingMessageListProcessor.java:63) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.<init>(MethodInvokingMessageGroupProcessor.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:211) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:198) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:186) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:60) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:171) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 20 common frames omitted

共有1个答案

端木宏盛
2023-03-14

在这种设置中,MessageChannelPartitionHandler应该是步骤范围。Javadoc中有一个关于这一点的注释:

Note: The reply channel for this is instance based.
Sharing this component across multiple step instances may result in the
crossing of messages. It's recommended that this component be step or job scoped.

将此bean步骤限定为作用域应该可以解决这个问题。

 类似资料:
  • 我使用spring batch来执行一些calcul,在reader中,我必须获得一个大数据,以便在处理器/写入器中处理,这个过程需要大量的(RAM)。因此,我尝试使用分区器拆分步骤,如下所示: 但是分区也徒劳地占用了大量内存,因为步骤(从机)是并行执行的,我想做的是拆分步骤并连续执行线程(而不是并行执行)以减少内存使用(RAM),这可能吗?

  • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“

  • 我在运行Spring批处理作业时遇到了一个技术问题。作业只是从DB(MongoDB)读取记录,对记录进行一些计算(聚合)并将记录结果写入另一个表。读取A、处理A、写入记录B B是A的许多记录的聚合。我想使用远程分块来垂直扩展我的系统,从而使处理部分缩放和快速。我面临的问题是,我需要同步A记录,以便在将结果写入B时处理它们不会发生冲突。如果我将10条A记录分发给4个从站,它们在将聚合结果写入B时会发

  • spring批处理远程分块和远程分区之间有什么区别? 我无法理解spring batch中远程分块和远程分区之间的区别。谁能解释一下吗?

  • 我试图配置我的第一个多线程作业。我们有大约200,000条记录的主目录,我们需要处理。我想将文件分解为10个文件并处理它们。拆分文件tasklet工作正常 主步骤在我的配置中运行,但从步骤不运行。下面是我的配置。 分割者: MultiResourceItemReader: FlatFileItemWriter: 作业配置: 从属步骤配置: 请告知我做错了什么。我没有看到处理器urlFileItem

  • 试着看看我是否能设计一个既需要分区又需要远程分块的工作。我们可以有类似于表A的东西来保存行(表A中的一列将是分区键),对于表A中的每一行,我们将有表B,其中包含表A中给定外部/分区键的许多子记录。我们需要运行一个查询,根据查询过滤表a中的分区键,并为每个分区键处理表B中的所有子记录(这里我们在表B中也会有数百万条记录,因此我们需要并行处理记录,从而实现远程分块) 对于这样的事情,什么是正确的思考s