我正在为批处理使用spring批处理远程分区。我正在使用spring batch admin启动作业。
我将入站网关使用者并发步骤设置为10,但并行运行的分区最多为8个。
稍后我想将消费者并发增加到15。
下面是我的配置,
<task:executor id="taskExecutor" pool-size="50" />
<rabbit:template id="computeAmqpTemplate"
connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
reply-timeout="${compute.partition.timeout}">
</rabbit:template>
<int:channel id="computeOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="computeInboundStagingChannel" />
<amqp:outbound-gateway request-channel="computeOutboundChannel"
reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="computeMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="computeOutboundChannel"
p:receiveTimeout="${compute.partition.timeout}" />
<beans:bean id="computePartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="computeStep" p:gridSize="${compute.grid.size}"
p:messagingOperations-ref="computeMessagingTemplate" />
<int:aggregator ref="computePartitionHandler"
send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
input-channel="computeInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
request-channel="computeInboundChannel"
reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<int:channel id="computeInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />
<int:channel id="computeOutboundStagingChannel" />
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
<beans:bean id="computeFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
scope="step" />
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.writers.ComputeItemWriter"
p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
p:batchId="#{jobParameters[batch_id]}" scope="step" />
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" writer="computeItemWriter"
commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
<flow id="computeFlow">
<step id="computeStep.master">
<partition partitioner="computePartitioner"
handler="computePartitionHandler" />
</step>
</flow>
<job id="computeJob" restartable="true">
<flow id="computeJob.computeFlow" parent="computeFlow" />
</job>
compute.grid.size = 112
compute.consumer.concurrency = 10
Input files are splited to 112 equal parts = compute.grid.size = total number of partitions
Number of servers = 4.
有2个问题,
i) 尽管我已经将并发设置为10,但运行的最大线程数是8。
二)
有些速度较慢,因为其他进程在它们上运行,有些速度较快,所以我希望确保步骤执行是公平分配的,即如果更快的服务器完成了它们的执行,队列中的其他剩余执行应该交给它们。它不应该以循环方式分发。
我知道在Rabbitmq中有预取计数设置和ack模式可以分配。对于Spring集成,预取计数默认为1,ack模式默认为AUTO。但是仍然有一些服务器保持运行更多分区,即使其他服务器已经完成了很长时间。理想情况下,没有服务器应该闲置。
更新:
我现在观察到的另一件事是,对于使用split并行运行的一些步骤(不是使用远程分区分发的),也可以并行运行max 8。这看起来有点像线程池限制问题,但正如您所见,taskExecutor将池大小设置为50。
是否有任何在sping-batch-admin中限制并发运行步骤的数量?
第二次更新:
而且,如果有8个或更多线程在并行处理项中运行,Spring批处理管理员不会加载。它只是挂起。如果我减少并发,Spring批处理管理员负载。我甚至在一台服务器上设置并发4和其他服务器上设置并发8来测试它,Spring批处理管理员不会加载它我使用运行8个线程的服务器的URL,但它适用于运行4个线程的服务器。
Spring批处理管理管理器具有以下jobLauncher配置,
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>
<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
那里的游泳池大小是6个,这与上述问题有关吗?
或者tomcat 7中是否有任何东西将运行的线程数限制为8?
困惑-你说“我已将并发设置为10”,但随后显示compute。消费者并发性=8
。因此,它正在按配置工作。如果属性设置为10,则不可能只有8个使用者线程。
从Rabbit的角度来看,所有的消费者都是平等的——如果一个慢盒上有10个消费者,而一个快盒上有10个消费者,而你只有10个分区,那么所有10个分区都有可能最终进入慢盒。
RabbitMQ不跨服务器分发工作,它只在消费者之间分发工作。
您可以通过减少并发来获得更好的分布。您还应该在较慢的框中设置较低的并发。
您是否在使用JobRepository数据库?
在执行期间,批处理框架会持续执行步骤,并且与JobRepository数据库的连接数可能会干扰并行步骤执行。
8的并发性让我觉得您可能正在使用BasicDataSource
?如果是这样,请切换到类似于DriverManager数据源
,然后查看。
我创建了一个多线程步骤,最大线程限制为10; 在处理了200万条记录文件后,我可以在日志文件中看到创建的线程太多,即使我将限制设置为10个线程。你能告诉我为什么吗?非常感谢。 2019-07-02T17:02:298968129857信息[batch-thread35348]com。db。wmdl。价格档案。工作一批听众。PriceFileReaderListener([])-PriceFileR
同步 同步指的是线程之间的协作配合,以共同完成某个任务。在整个过程中,需要注意两个关键点:一是共享资源的访问, 二是访问资源的顺序。通过前面的介绍,我们已经知道了如何让多个线程访问共享资源,但并没介绍如何控制访问顺序,才不会出现错误。如果两个线程同时访问同一内存地址的数据,一个写,一个读,如果不加控制,写线程只写了一半,读线程就开始读,必然读到的数据是错误的,不可用的,从而造成程序错误,这就造成了
我们在POC中使用远程分区,处理大约2000万条记录。为了处理这些记录,slave需要一些大约5000行的静态元数据。我们当前的POC使用EhCache从数据库一次将元数据加载到从机中,并将其放入缓存中,这样子用户调用就可以从缓存中获取这些数据,从而获得更好的性能。 现在,由于我们使用远程分区,我们的从机大约有20个MDP/线程,因此每个消息侦听器首先调用以从数据库获取元数据,因此基本上每个远程机
null null 任何解决问题的帮助/指针都是非常感谢的。 谢谢,哈尔·克里尚
并发是什么?引用Rob Pike的经典描述: 并发是同一时间应对多件事情的能力 其实在我们身边就有很多并发的事情,比如一边上课,一边发短信;一边给小孩喂奶,一边看电视,只要你细心留意,就会发现许多类似的事。相应地,在软件的世界里,我们也会发现这样的事,比如一边写博客,一边听音乐;一边看网页,一边下载软件等等。显而易见这样会节约不少时间,干更多的事。然而一开始计算机系统并不能同时处理两件事,这明显满
并行 理论上并行和语言并没有什么关系,所以在理论上的并行方式,都可以尝试用Rust来实现。本小节不会详细全面地介绍具体的并行理论知识,只介绍用Rust如何来实现相关的并行模式。 Rust的一大特点是,可以保证“线程安全”。而且,没有性能损失。更有意思的是,Rust编译器实际上只有Send Sync等基本抽象,而对“线程” “锁” “同步” 等基本的并行相关的概念一无所知,这些概念都是由库实现的。这