当前位置: 首页 > 知识库问答 >
问题:

Spring batch admin远程分区步骤最多运行8个线程,即使并发度为10?

尹俊贤
2023-03-14

我正在为批处理使用spring批处理远程分区。我正在使用spring batch admin启动作业。

我将入站网关使用者并发步骤设置为10,但并行运行的分区最多为8个。

稍后我想将消费者并发增加到15。

下面是我的配置,

<task:executor id="taskExecutor" pool-size="50" />

<rabbit:template id="computeAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
    reply-timeout="${compute.partition.timeout}">
</rabbit:template>

<int:channel id="computeOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="computeInboundStagingChannel" />

<amqp:outbound-gateway request-channel="computeOutboundChannel"
    reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="computeMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="computeOutboundChannel"
    p:receiveTimeout="${compute.partition.timeout}" />


<beans:bean id="computePartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="computeStep" p:gridSize="${compute.grid.size}"
    p:messagingOperations-ref="computeMessagingTemplate" />

<int:aggregator ref="computePartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
    input-channel="computeInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
    request-channel="computeInboundChannel" 
    reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="computeInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />

<int:channel id="computeOutboundStagingChannel" />

<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />



<beans:bean id="computeFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<beans:bean id="computeItemWriter"
    class="com.st.batch.foundation.writers.ComputeItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
    p:batchId="#{jobParameters[batch_id]}" scope="step" />


<step id="computeStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="computeFileItemReader" writer="computeItemWriter"
            commit-interval="${compute.commit.interval}" />
    </tasklet>
</step>

<flow id="computeFlow">
    <step id="computeStep.master">
        <partition partitioner="computePartitioner"
            handler="computePartitionHandler" />
    </step>
</flow>

<job id="computeJob" restartable="true">
    <flow id="computeJob.computeFlow" parent="computeFlow" />
</job>



compute.grid.size = 112
compute.consumer.concurrency = 10

Input files are splited to 112 equal parts = compute.grid.size = total number of partitions

Number of servers = 4.

有2个问题,

i) 尽管我已经将并发设置为10,但运行的最大线程数是8。

二)

有些速度较慢,因为其他进程在它们上运行,有些速度较快,所以我希望确保步骤执行是公平分配的,即如果更快的服务器完成了它们的执行,队列中的其他剩余执行应该交给它们。它不应该以循环方式分发。

我知道在Rabbitmq中有预取计数设置和ack模式可以分配。对于Spring集成,预取计数默认为1,ack模式默认为AUTO。但是仍然有一些服务器保持运行更多分区,即使其他服务器已经完成了很长时间。理想情况下,没有服务器应该闲置。

更新:

我现在观察到的另一件事是,对于使用split并行运行的一些步骤(不是使用远程分区分发的),也可以并行运行max 8。这看起来有点像线程池限制问题,但正如您所见,taskExecutor将池大小设置为50。

是否有任何在sping-batch-admin中限制并发运行步骤的数量?

第二次更新:

而且,如果有8个或更多线程在并行处理项中运行,Spring批处理管理员不会加载。它只是挂起。如果我减少并发,Spring批处理管理员负载。我甚至在一台服务器上设置并发4和其他服务器上设置并发8来测试它,Spring批处理管理员不会加载它我使用运行8个线程的服务器的URL,但它适用于运行4个线程的服务器。

Spring批处理管理管理器具有以下jobLauncher配置,

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>

<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

那里的游泳池大小是6个,这与上述问题有关吗?

或者tomcat 7中是否有任何东西将运行的线程数限制为8?

共有2个答案

林龙野
2023-03-14

困惑-你说“我已将并发设置为10”,但随后显示compute。消费者并发性=8。因此,它正在按配置工作。如果属性设置为10,则不可能只有8个使用者线程。

从Rabbit的角度来看,所有的消费者都是平等的——如果一个慢盒上有10个消费者,而一个快盒上有10个消费者,而你只有10个分区,那么所有10个分区都有可能最终进入慢盒。

RabbitMQ不跨服务器分发工作,它只在消费者之间分发工作。

您可以通过减少并发来获得更好的分布。您还应该在较慢的框中设置较低的并发。

蔡宏大
2023-03-14

您是否在使用JobRepository数据库?

在执行期间,批处理框架会持续执行步骤,并且与JobRepository数据库的连接数可能会干扰并行步骤执行。

8的并发性让我觉得您可能正在使用BasicDataSource?如果是这样,请切换到类似于DriverManager数据源,然后查看。

 类似资料:
  • 我创建了一个多线程步骤,最大线程限制为10; 在处理了200万条记录文件后,我可以在日志文件中看到创建的线程太多,即使我将限制设置为10个线程。你能告诉我为什么吗?非常感谢。 2019-07-02T17:02:298968129857信息[batch-thread35348]com。db。wmdl。价格档案。工作一批听众。PriceFileReaderListener([])-PriceFileR

  • 同步 同步指的是线程之间的协作配合,以共同完成某个任务。在整个过程中,需要注意两个关键点:一是共享资源的访问, 二是访问资源的顺序。通过前面的介绍,我们已经知道了如何让多个线程访问共享资源,但并没介绍如何控制访问顺序,才不会出现错误。如果两个线程同时访问同一内存地址的数据,一个写,一个读,如果不加控制,写线程只写了一半,读线程就开始读,必然读到的数据是错误的,不可用的,从而造成程序错误,这就造成了

  • 我们在POC中使用远程分区,处理大约2000万条记录。为了处理这些记录,slave需要一些大约5000行的静态元数据。我们当前的POC使用EhCache从数据库一次将元数据加载到从机中,并将其放入缓存中,这样子用户调用就可以从缓存中获取这些数据,从而获得更好的性能。 现在,由于我们使用远程分区,我们的从机大约有20个MDP/线程,因此每个消息侦听器首先调用以从数据库获取元数据,因此基本上每个远程机

  • null null 任何解决问题的帮助/指针都是非常感谢的。 谢谢,哈尔·克里尚

  • 并发是什么?引用Rob Pike的经典描述: 并发是同一时间应对多件事情的能力 其实在我们身边就有很多并发的事情,比如一边上课,一边发短信;一边给小孩喂奶,一边看电视,只要你细心留意,就会发现许多类似的事。相应地,在软件的世界里,我们也会发现这样的事,比如一边写博客,一边听音乐;一边看网页,一边下载软件等等。显而易见这样会节约不少时间,干更多的事。然而一开始计算机系统并不能同时处理两件事,这明显满

  • 并行 理论上并行和语言并没有什么关系,所以在理论上的并行方式,都可以尝试用Rust来实现。本小节不会详细全面地介绍具体的并行理论知识,只介绍用Rust如何来实现相关的并行模式。 Rust的一大特点是,可以保证“线程安全”。而且,没有性能损失。更有意思的是,Rust编译器实际上只有Send Sync等基本抽象,而对“线程” “锁” “同步” 等基本的并行相关的概念一无所知,这些概念都是由库实现的。这