当前位置: 首页 > 知识库问答 >
问题:

MessageChannelPartitionHandler使用来自QueueChannel的第一个可用答复,从而完成错误的作业

瞿和硕
2023-03-14

我看到的问题和这里说的一样

http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread

我有多个作业正在运行,这些作业的步骤使用下面定义的相同父分区hanldler“parent PartitionHandler”

<bean id="parentPartitionHandler"class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outboundReplies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="outboundRequests"/>
            <property name="receiveTimeout" value="60000000"/>
        </bean>
    </property>
    <property name="stepName" value="parentPartitionStep"/>
</bean>

我的所有作业都有如下类似的配置,第2步“学生分区和处理步骤”是分区步骤

<job id="studentLoadJob" xmlns="http://www.springframework.org/schema/batch"
     job-repository="jobRepository" restartable="true" parent="abstractJob">
    <step id="studentLoadStep" parent="parentLoadStep" next="studentPartitionAndProcessStep"/>
    <step id="studentPartitionAndProcessStep" next="studentCleanupStep">
        <partition partitioner="filePartitioner" handler="studentPartitionHandler"/>
    </step>
    <step id="studentCleanupStep" parent="parentCleanupStep"/>
</job>

<bean id="studentPartitionHandler"
      parent="parentPartitionHandler">
    <property name="stepName" value="studentPartitionStep"/>
</bean>

我在这里使用了相同的主配置https://github.com/mminella/spring-batch-talk-2.0/blob/master/src/main/resources/meta-inf/remotePartition.xml

<int:channel id="outboundReplies">
 <int:queue/>
 </int:channel>

<int:channel id="inboundStaging">
</int:channel>

<int:aggregator ref="parentPartitionHandler" send-partial-result-on-expiry="true" send-timeout="60000000"
                input-channel="inboundStaging" output-channel="outboundReplies"
                expire-groups-upon-completion="true"/>

我遇到的问题是aggregator似乎正确地收集了一个组的消息,但在MessageChannelPartitionHandler中,下面的语句接收第一条可用消息时没有从消息中获取头部信息。

Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);

因此,PartitionHandler处理jobExecutionA而不是jobExecutionB的结果,因此它完成了一个错误的作业。

似乎MessageChannelPartitionHandler使用来自QueueChannel(在我的配置中是outboundReplies)的答复,而不考虑correlationId,而是第一条可用的消息。它工作了几次,然后它不工作,然后当我调试时,我发现同样的事情发生在我身上,就像这里的帖子一样

http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread

我在这里做错了什么吗?
如果您需要,我可以提供更多的配置。

编辑:
网关都工作正常。我还试图使用适配器并添加header-enricher,以便将replyChannel对象添加到Header中。我添加了一个header-enricher元素,但我可能使用错误,因为当聚合消息时,hanlder抛出“没有outputChannel或replyChannel header可用”。所有分区请求都在outboundRequests通道上发送到rabbit队列,从服务器将从inboundRequestschannel接收请求,服务激活器将处理这些请求并在outboundStaging通道上发送回应答队列。在主端分区,聚合器从inboundStaging通道读取响应消息

<int:channel id="outboundRequests">
    <int:dispatcher task-executor="taskExecutor" failover="true"/>
</int:channel>

<int:channel id="inboundStaging"/>
<int:channel id="inboundRequests"/>
<int:channel id="outboundStaging"/>
<int:channel id="setHeaderPartionHandlerReplyChannel"/>


<int-amqp:outbound-channel-adapter
        id="filePartitionRequestOutboundGateway"
        channel="outboundRequests"
        amqp-template="rabbitTemplate"
        exchange-name="${rabbitmq.classflow.exchange}"
        routing-key="${rabbitmq.classflow.batch.partition.routingkey}"
        mapped-request-headers="*"
        />


<int-amqp:inbound-channel-adapter
        id="filePartitionRequestInboundGateway"
        concurrent-consumers="${rabbitmq.classflow.batch.partition.consumers}"
        channel="inboundRequests"
        receive-timeout="60000000"
        queue-names="${rabbitmq.classflow.batch.partition.queuename.request}"
        connection-factory="rabbitConnectionFactory"
        mapped-request-headers="*"
        />

<int:header-enricher input-channel="setHeaderPartionHandlerReplyChannel" output-channel="outboundRequests">
    <int:header-channels-to-string/>
</int:header-enricher>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inboundRequests"
                       output-channel="outboundStaging"/>


<int-amqp:outbound-channel-adapter
        id="filePartitionRepyOutboundGateway"
        channel="outboundStaging"
        amqp-template="rabbitTemplate"
        exchange-name="${rabbitmq.classflow.exchange}"
        routing-key="${rabbitmq.classflow.batch.partition.queuename.reply.routingkey}"
        mapped-request-headers="*"
 />

<int-amqp:inbound-channel-adapter
        id="filePartitionRepyInboundGateway"
        channel="inboundStaging"
        queue-names="${rabbitmq.classflow.batch.partition.queuename.reply}"
        connection-factory="rabbitConnectionFactory"
        concurrent-consumers="${rabbitmq.classflow.batch.tenant.job.consumers}"
        mapped-request-headers="*"
   />

<int:aggregator ref="parentPartitionHandler"
                send-partial-result-on-expiry="true"
                send-timeout="60000000"
                input-channel="inboundStaging"/>

<bean id="parentPartitionHandler"
      class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    <property name="gridSize" value="3"/>
    <property name="messagingOperations">
        <bean class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="setHeaderPartionHandlerReplyChannel"/>
            <property name="receiveTimeout" value="60000000"/>
        </bean>
    </property>
    <property name="stepName" value="parentPartitionStep"/>
</bean>

谢谢
Mallikarjun

共有1个答案

公沈浪
2023-03-14

这个问题在Spring Batch 3.0.3中得到了修复,只要您不将特定的ReplyChannel注入到分区处理程序中。

spring-batch-integration现在是主Spring Batch项目的一部分。

编辑

  • 添加一个带有 元素的header enricher(请参见header Channel注册表文档);这将活动标头通道对象转换为通道注册表的键。
  • 配置适配器以映射replychannel标头(例如mapped-request-headers=“*”)。

网关不需要这样做,因为当收到答复时,它持有对出站消息的引用。

 类似资料:
  • 我要做的是异步计算树结构的深度,我将有树的第一层,我想启动一个异步线程来分别计算每个节点的深度。 在计算过程中,树中显然可能有一个分叉,在这一点上,我想踢一个额外的线程来计算那个分支。 我已经得到了这个工作,但我需要做一些整理逻辑,当所有这些未来完成。但我对这一过程中产生的额外的可完成的未来感到困扰。 我会用什么方法来保存所有开始的CompletableFutures+那些动态创建的,并且在执行任

  • 我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做

  • 获取java.net.socketException:以下socket创建的SOCKS服务器异常的错误回复。我没有使用任何SOCKS服务器&它只是一个独立的程序,这是起跑线&仅在这里获取异常。我也使用反射API。 从其他方法调用,如下所示:

  • 如何使用完全未来使用第一个可调用任务的结果作为参数,所有后续的可调用任务?我有3个任务需要这样运行: 第一个阻塞任务运行并返回一个值 我试图在下面这样做,但我被困在子句。 我无法让这个代码正常工作。在子句,如何从返回的对象响应传递参数? 注意:我确实希望在未来结束时,将所有3个任务的响应作为一个组合结果列表收集在一起,或许作为一个整数值流?在这种情况下,我想求和这些值。我希望通过多线程来提高性能。

  • 我有一个实例列表。 如何将他们转变成这样一个未来:

  • 问题内容: 在该线程中,我找到了一种在中实现功能的方法(并且,但与此无关)。 尝试在中实施此功能时,会引发一些异常。以下代码将演示该问题。它是如此简单,与我使用的方式非常相似,效果很好。 有人可以帮我找到解决方案吗? 下面的代码: ( 请注意,我使用) 这是错误消息我得到: ( 这是一个有点长:d ) 问题答案: 您的代码中没有涉及。我认为应该读这样的东西… 如果你想绑定到(以便文本字段将更新的选