当前位置: 首页 > 知识库问答 >
问题:

使用Spring batch partitioning和rabbitmq可以同时运行具有不同参数的同一作业的多个作业实例吗

郎喜
2023-03-14

我可以同时运行我的作业步骤,但我有点担心,如果我用不同的参数同时启动同一作业的多个实例,它将如何工作。

我正在使用importExchange作业导入Exchange数据,但如果我同时为不同的市场如美国市场、欧洲marktet启动importExchange作业。

Partitioner将输入exchange名称分区到不同的分区步骤执行上下文中,MessagePartitionHandler将stepExecutionRequests作为消息通过rabbitmq队列发送到不同的服务器,并在不同的服务器上并发执行步骤。

由于我们使用的是直接通道和交换,响应将只传递给一个侦听器,那么job1的响应会被Job2的侦听器拾取吗?

或者是否有路由器或过滤器选择性地选择回复?

我需要担心这个还是MessageChannelPartitionHandler来处理这个?或者我应该在应答队列前缀作业id?

<task:executor id="taskExecutor" pool-size="20" />

<int:channel id="importExchangesOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="importExchangesInboundStagingChannel" />

<amqp:outbound-gateway request-channel="importExchangesOutboundChannel"
    reply-channel="importExchangesInboundStagingChannel" amqp-template="importExchangesAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesOutboundChannel"
    p:receiveTimeout="150000" />


<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    scope="step" />


<beans:bean id="importExchangesPartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="importExchangesStep" p:gridSize="6"
    p:messagingOperations-ref="importExchangesMessagingTemplate" />

<int:aggregator ref="importExchangesPartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="300000"
    input-channel="importExchangesInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="6"
    request-channel="importExchangesInboundChannel" receive-timeout="300000"
    reply-channel="importExchangesOutboundStagingChannel" queue-names="importExchangesQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />

<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
    routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>

<int:channel id="importExchangesInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="importExchangesInboundChannel" output-channel="importExchangesOutboundStagingChannel" />

<int:channel id="importExchangesOutboundStagingChannel" />



<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<beans:bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler"
    p:jobExplorer-ref="jobExplorer" p:stepLocator-ref="stepLocator" />


<beans:bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />


<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
    p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />

<step id="importExchangesStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
            commit-interval="${import.exchanges.commit.interval}" />
    </tasklet>
</step>

<job id="importExchangesJob" restartable="true">

    <step id="importExchangesStep.master" next="importEclsStep.master">
        <partition partitioner="importExchangesPartitioner"
            handler="importExchangesPartitionHandler" />
    </step>

</job>
<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
    routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>

我的工作ID是2014-06-08和2014-06-09。我在文件夹名为2014-06-08和2014-06-09下创建了Exchanges.txt。

/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt
/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt

/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt文件中的数据是

1
2
3
up to 30

在/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt中

31
32
33
up to 60
<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
    p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />
<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.ecls.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />
@Override
public void write(List<? extends T> exchanges) throws Exception {

    commandRunner.setLogFilePath(this.logFilePath);

    for (T exchange : exchanges) {

        String command = commandRunner.getConsolePath() + " "
                + "st:import exchange" + " " + exchange.toString();

        commandRunner.run(command, this.replyTimeout);  
    }

}

里面的指挥官,

public void run(String command, long replyTimeout)
        throws Exception {

    String[] commands = command.split("\\s+");

    ProcessBuilder pb = new ProcessBuilder(commands);
    File log = new File(this.logFilePath);
    pb.redirectErrorStream(true);
    pb.redirectOutput(Redirect.appendTo(log));
    Process p = pb.start();
    .......
}

如果我只启动一个作业实例(8个,批处理id为2015-06-08或2015-06-09),一切都很好,但是如果我同时启动这两个作业实例的步骤输入数据会混合,我的意思是,这是我在日志文件中得到的

尾部-F/var/log/st/batch.log.2015-06-08

14 23 1 27 19 9 15 24 2 10 28 20 25 16 3 21 29 11 26 17 4 30 12 22 18 5 44 45 46
52 13 47 31 37 6 53 57 48 32 38 54 7 49 58 33 39 55 8 59 50 34 40 56 60 51 35 42 41 36 43
<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />

它是由于出站和入站网关而发生的吗?spring集成通道、网关等的不同实例是为不同的作业实例创建的,还是像所有作业实例都相同的rabbitmq队列?

入站网关具有concurrent-consumers=“8”这些使用者对于所有作业实例是相同的,还是将为每个作业实例创建单独的8个使用者?

这个处理程序可以为多个作业分区吗?

<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    scope="step" />

共有1个答案

步博涉
2023-03-14

这一切都由框架为您处理好了。当分区处理程序发出步骤执行请求时,他在头中设置一个相关id...

.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())

聚合器使用它将所有响应(针对此作业)聚合成一条消息,当收到所有响应时,该消息将被释放给分区处理程序。

Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);

这是使用序列大小标头实现的。

 类似资料:
  • 我配置了一个spring批处理作业,它在spring WebService中运行。这项工作有几个步骤。我已经在不同的tomcats中部署了这个webservice的两个实例(但两个实例都使用相同的mysql数据库)。 我希望用不同的参数在两个tomcats中同时运行spring批处理作业(每个tomcats中一个)。我没有使用分区,每个作业的参数是完全不同的。 我开始工作在一个汤姆猫和一切看起来很

  • 使用spring batch/spring boot,是否可以在每个线程中使用不同的多次启动具有读取器、处理器和写入器的spring batch作业? 我的用例: 我有许多不同的文件夹,我需要观看。如果新文件进入一个文件夹,我需要调用该作业,并在作业处理期间锁定该文件夹。 这可能发生在不同数量的文件夹中,这就是为什么我需要一个spring批处理作业的多个实例,但每次使用不同的。 每个文件夹一个作业

  • 我们的要求是同时写多个文件。我们正在使用spring批处理来编写文件,并且我们正在从不同的线程中启动spring批处理。每个线程都有自己的应用程序上下文。因此我们可以确保单例bean不会跨多个线程共享。下面是我的代码片段。 这就是我们调用spring批处理的方式。 ThreadPoolExecutor TPE=new ThreadPoolExecutor(10,10,1000000,TimeUni

  • 我是冲刺批次的新手,我找不到问题的答案。 我正在尝试使用Spring引导和Spring批处理实现JOB。我的JOB需要一个参数,所以我像这样执行应用程序: java-jar-Dspring。配置文件。活动=gus/应用程序/botbit批处理/botbit-batch-1.0.0。jar——Spring。一批工作名称=persistCustomerSuccessMetrics日期=2015年12月

  • 问题内容: 我有一个名为“开发”的工作,另一个名为“代码分析”的项目。目前,我们有两个不同的作业和不同的工作区,但是代码相同。有什么办法可以将同一工作空间用于多个作业? 我检查了Jenkins中可用的插件,但是没有找到合适的插件。 问题答案: 假设您的“开发” Jenkins工作空间为。在“代码分析”作业配置页面中,在选项卡下单击并选择选项,并提供与“开发”作业相同的工作空间。

  • 我有两个不同的工作(实际上更多,但为了简单起见,假设2)。每个作业可以与另一个作业并行运行,但同一作业的每个实例应该顺序运行(否则实例将共享彼此的资源)。 基本上,我希望这些作业中的每一个都有自己的作业实例队列。我想我可以使用两个不同的线程池作业启动程序(每个都有一个线程),并将一个作业启动程序与每个作业相关联。 在从Spring Batch Admin web UI中启动作业时,是否有一种方法可