我看到的问题和这里说的一样
http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread
我有多个作业正在运行,这些作业的步骤使用下面定义的相同父分区hanldler“parent PartitionHandler”
<bean id="parentPartitionHandler"class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outboundReplies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outboundRequests"/>
<property name="receiveTimeout" value="60000000"/>
</bean>
</property>
<property name="stepName" value="parentPartitionStep"/>
</bean>
我的所有作业都有如下类似的配置,第2步“学生分区和处理步骤”是分区步骤
<job id="studentLoadJob" xmlns="http://www.springframework.org/schema/batch"
job-repository="jobRepository" restartable="true" parent="abstractJob">
<step id="studentLoadStep" parent="parentLoadStep" next="studentPartitionAndProcessStep"/>
<step id="studentPartitionAndProcessStep" next="studentCleanupStep">
<partition partitioner="filePartitioner" handler="studentPartitionHandler"/>
</step>
<step id="studentCleanupStep" parent="parentCleanupStep"/>
</job>
<bean id="studentPartitionHandler"
parent="parentPartitionHandler">
<property name="stepName" value="studentPartitionStep"/>
</bean>
我在这里使用了相同的主配置https://github.com/mminella/spring-batch-talk-2.0/blob/master/src/main/resources/meta-inf/remotePartition.xml
<int:channel id="outboundReplies">
<int:queue/>
</int:channel>
<int:channel id="inboundStaging">
</int:channel>
<int:aggregator ref="parentPartitionHandler" send-partial-result-on-expiry="true" send-timeout="60000000"
input-channel="inboundStaging" output-channel="outboundReplies"
expire-groups-upon-completion="true"/>
我遇到的问题是aggregator似乎正确地收集了一个组的消息,但在MessageChannelPartitionHandler中,下面的语句接收第一条可用消息时没有从消息中获取头部信息。
Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);
因此,PartitionHandler处理jobExecutionA而不是jobExecutionB的结果,因此它完成了一个错误的作业。
似乎MessageChannelPartitionHandler使用来自QueueChannel(在我的配置中是outboundReplies)的答复,而不考虑correlationId,而是第一条可用的消息。它工作了几次,然后它不工作,然后当我调试时,我发现同样的事情发生在我身上,就像这里的帖子一样
http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread
我在这里做错了什么吗?
如果您需要,我可以提供更多的配置。
编辑:
网关都工作正常。我还试图使用适配器并添加header-enricher,以便将replyChannel对象添加到Header中。我添加了一个header-enricher元素,但我可能使用错误,因为当聚合消息时,hanlder抛出“没有outputChannel或replyChannel header可用”。所有分区请求都在outboundRequests通道上发送到rabbit队列,从服务器将从inboundRequestschannel接收请求,服务激活器将处理这些请求并在outboundStaging通道上发送回应答队列。在主端分区,聚合器从inboundStaging通道读取响应消息。
<int:channel id="outboundRequests">
<int:dispatcher task-executor="taskExecutor" failover="true"/>
</int:channel>
<int:channel id="inboundStaging"/>
<int:channel id="inboundRequests"/>
<int:channel id="outboundStaging"/>
<int:channel id="setHeaderPartionHandlerReplyChannel"/>
<int-amqp:outbound-channel-adapter
id="filePartitionRequestOutboundGateway"
channel="outboundRequests"
amqp-template="rabbitTemplate"
exchange-name="${rabbitmq.classflow.exchange}"
routing-key="${rabbitmq.classflow.batch.partition.routingkey}"
mapped-request-headers="*"
/>
<int-amqp:inbound-channel-adapter
id="filePartitionRequestInboundGateway"
concurrent-consumers="${rabbitmq.classflow.batch.partition.consumers}"
channel="inboundRequests"
receive-timeout="60000000"
queue-names="${rabbitmq.classflow.batch.partition.queuename.request}"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
/>
<int:header-enricher input-channel="setHeaderPartionHandlerReplyChannel" output-channel="outboundRequests">
<int:header-channels-to-string/>
</int:header-enricher>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inboundRequests"
output-channel="outboundStaging"/>
<int-amqp:outbound-channel-adapter
id="filePartitionRepyOutboundGateway"
channel="outboundStaging"
amqp-template="rabbitTemplate"
exchange-name="${rabbitmq.classflow.exchange}"
routing-key="${rabbitmq.classflow.batch.partition.queuename.reply.routingkey}"
mapped-request-headers="*"
/>
<int-amqp:inbound-channel-adapter
id="filePartitionRepyInboundGateway"
channel="inboundStaging"
queue-names="${rabbitmq.classflow.batch.partition.queuename.reply}"
connection-factory="rabbitConnectionFactory"
concurrent-consumers="${rabbitmq.classflow.batch.tenant.job.consumers}"
mapped-request-headers="*"
/>
<int:aggregator ref="parentPartitionHandler"
send-partial-result-on-expiry="true"
send-timeout="60000000"
input-channel="inboundStaging"/>
<bean id="parentPartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="gridSize" value="3"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="setHeaderPartionHandlerReplyChannel"/>
<property name="receiveTimeout" value="60000000"/>
</bean>
</property>
<property name="stepName" value="parentPartitionStep"/>
</bean>
谢谢
Mallikarjun
这个问题在Spring Batch 3.0.3中得到了修复,只要您不将特定的ReplyChannel
注入到分区处理程序中。
spring-batch-integration
现在是主Spring Batch项目的一部分。
编辑
元素的header enricher(请参见header Channel注册表文档);这将活动标头通道对象转换为通道注册表的键。replychannel
标头(例如mapped-request-headers=“*”
)。网关不需要这样做,因为当收到答复时,它持有对出站消息的引用。
我要做的是异步计算树结构的深度,我将有树的第一层,我想启动一个异步线程来分别计算每个节点的深度。 在计算过程中,树中显然可能有一个分叉,在这一点上,我想踢一个额外的线程来计算那个分支。 我已经得到了这个工作,但我需要做一些整理逻辑,当所有这些未来完成。但我对这一过程中产生的额外的可完成的未来感到困扰。 我会用什么方法来保存所有开始的CompletableFutures+那些动态创建的,并且在执行任
我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做
获取java.net.socketException:以下socket创建的SOCKS服务器异常的错误回复。我没有使用任何SOCKS服务器&它只是一个独立的程序,这是起跑线&仅在这里获取异常。我也使用反射API。 从其他方法调用,如下所示:
如何使用完全未来使用第一个可调用任务的结果作为参数,所有后续的可调用任务?我有3个任务需要这样运行: 第一个阻塞任务运行并返回一个值 我试图在下面这样做,但我被困在子句。 我无法让这个代码正常工作。在子句,如何从返回的对象响应传递参数? 注意:我确实希望在未来结束时,将所有3个任务的响应作为一个组合结果列表收集在一起,或许作为一个整数值流?在这种情况下,我想求和这些值。我希望通过多线程来提高性能。
我有一个实例列表。 如何将他们转变成这样一个未来:
问题内容: 我已经从Rich Faces 3.3升级到Rich Faces 4.2,因为Ajax不适用于IE9。现在它仍然不起作用。 收到响应后,IE会收到一个JS错误 “ SCRIPT58734:从源头上来的东西”:c00ce56e。 在尝试时 在jsf.js.html?ln = javax.faces&conversationContext = 2,第1行第21747行 我认为是因为HTTP标