问题:流输入仅适用于向聚合器发送输出通道输出的1个输入。随后的消息只进入丢弃通道logLateArvers。什么条件被用来发送到丢弃通道?
描述:尝试使用使用WebSphere的聚合器为基本jms移植Spring集成示例。
更新:-打开调试显示轮询器正在工作。消息被拉入并放到MQ,响应被拾取。但是,对于第一组消息之后的MQ场景,不使用AggregatingMessageHandler。消息被发送到丢弃通道与通道“输出”上的“logLateArrivers”适配器进行输出。我将问题声明的措辞改得更具体一些。
Spring集成示例:Spring集成Github示例
使用Spring集成的输出示例:
test1
test2
[TEST1, TEST1]
[TEST2, TEST2]
使用Spring与Websphere集成的输出
test1
test2
[TEST1, TEST2]
[TEST1, TEST2]
使用Websphere MQ移植更改
>
common.xml
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="com.ibm.mq.jms.MQConnectionFactory">
<property name="channel" value="channelName" />
<property name="hostName" value="host1234" />
<property name="port" value="1111" />
<property name="queueManager" value="testqmgr" />
<property name="transportType" value="1" />
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
</bean>
<bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
<constructor-arg value="requestQueue"/>
</bean>
<bean id="requestTopic" class="com.ibm.mq.jms.MQTopic">
<constructor-arg value="topic.demo"/>
</bean>
<bean id="replyQueue" class="com.ibm.mq.jms.MQQueue">
<constructor-arg value="replyQueue"/>
</bean>
<!-- Poller that is the stream in channel for console input -->
<integration:poller id="poller" default="true" fixed-delay="1000"/>
一个ggregation.xml
<int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>
<int:channel id="stdinToJmsoutChannel"/>
<int:chain input-channel="stdinToJmsoutChannel">
<int:header-enricher>
<int:header name="jms_replyTo" ref="replyQueue" />
</int:header-enricher>
<int-jms:outbound-channel-adapter destination="requestTopic" />
</int:chain>
<int-jms:message-driven-channel-adapter channel="jmsReplyChannel"
destination="replyQueue"/>
<int:channel id="jmsReplyChannel" />
<int:aggregator input-channel="jmsReplyChannel" output-channel="out"
group-timeout="5000"
expire-groups-upon-timeout="false"
send-partial-result-on-expiry="true"
discard-channel="logLateArrivers"
correlation-strategy-expression="headers['jms_correlationId']"
release-strategy-expression="size() == 2"/>
<int:logging-channel-adapter id="logLateArrivers" />
<!-- Subscribers -->
<int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
<int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
<int:transformer input-channel="upcase" expression="payload.toUpperCase()" />
<!-- Profiles -->
<beans profile="default">
<int-stream:stdout-channel-adapter id="out" append-newline="true"/>
</beans>
<beans profile="testCase">
<int:bridge input-channel="out" output-channel="queueChannel"/>
<int:channel id="queueChannel">
<int:queue />
</int:channel>
</beans>
消息应该在jms_correlationId
上关联。打开DEBUG日志记录并比较示例和您的版本之间的消息流。可能相关id设置不正确。
入站网关使用此逻辑。。。
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
因此,当发送到聚合器时,与每个请求关联的消息应该获得相同的jms\u correlationId
。
您的测试表明这两条消息在某种程度上具有相同的消息id。
编辑
到达的消息具有相同的相关ID(在本例中为头['jms_correlationId']
)将被丢弃(晚到达者),除非到期-组-完成后="true"
-允许新组开始而不是丢弃。您需要弄清楚为什么第二组具有与第一组相同的相关id。
我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只
在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我
我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。
目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I
null 重新创建问题的测试可以在https://github.com/hawk1234/spring-integration-example commit 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4中找到。当您运行测试时,应用程序日志可以在路径build/logs/spring-integration-example.log下获得。当前测试挂起,因为网关从未
我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?