当前位置: 首页 > 知识库问答 >
问题:

使用WebSphere的Spring集成聚合示例

有骏奇
2023-03-14

问题:流输入仅适用于向聚合器发送输出通道输出的1个输入。随后的消息只进入丢弃通道logLateArvers。什么条件被用来发送到丢弃通道?

描述:尝试使用使用WebSphere的聚合器为基本jms移植Spring集成示例。

更新:-打开调试显示轮询器正在工作。消息被拉入并放到MQ,响应被拾取。但是,对于第一组消息之后的MQ场景,不使用AggregatingMessageHandler。消息被发送到丢弃通道与通道“输出”上的“logLateArrivers”适配器进行输出。我将问题声明的措辞改得更具体一些。

Spring集成示例:Spring集成Github示例

使用Spring集成的输出示例:

test1
test2
[TEST1, TEST1]
[TEST2, TEST2]

使用Spring与Websphere集成的输出

test1
test2
[TEST1, TEST2]
[TEST1, TEST2]

使用Websphere MQ移植更改

>

  • common.xml

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="com.ibm.mq.jms.MQConnectionFactory">
                <property name="channel" value="channelName" />
                <property name="hostName" value="host1234" />
                <property name="port" value="1111" />
                <property name="queueManager" value="testqmgr" />
                <property name="transportType" value="1" /> 
            </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
    </bean>
    
    <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
        <constructor-arg value="requestQueue"/>
    </bean>
    
    <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic">
        <constructor-arg value="topic.demo"/>
    </bean>
    
    <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue">
        <constructor-arg value="replyQueue"/>
    </bean>
    
    <!-- Poller that is the stream in channel for console input -->
    <integration:poller id="poller" default="true" fixed-delay="1000"/>
    

    一个ggregation.xml

    <int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>
    
    <int:channel id="stdinToJmsoutChannel"/>
    
    <int:chain input-channel="stdinToJmsoutChannel">
        <int:header-enricher>
            <int:header name="jms_replyTo" ref="replyQueue" />
        </int:header-enricher>
        <int-jms:outbound-channel-adapter destination="requestTopic" />
    </int:chain>
    
    <int-jms:message-driven-channel-adapter channel="jmsReplyChannel"
        destination="replyQueue"/>
    
    <int:channel id="jmsReplyChannel" />
    
    <int:aggregator input-channel="jmsReplyChannel" output-channel="out"
        group-timeout="5000"
        expire-groups-upon-timeout="false"
        send-partial-result-on-expiry="true"
        discard-channel="logLateArrivers"
        correlation-strategy-expression="headers['jms_correlationId']"
        release-strategy-expression="size() == 2"/>
    
    <int:logging-channel-adapter id="logLateArrivers" />
    
    <!-- Subscribers -->
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
    
    <int:transformer input-channel="upcase" expression="payload.toUpperCase()" />
    
    <!-- Profiles -->
    
    <beans profile="default">
    
        <int-stream:stdout-channel-adapter id="out" append-newline="true"/>
    
    </beans>
    
    <beans profile="testCase">
    
        <int:bridge input-channel="out" output-channel="queueChannel"/>
    
        <int:channel id="queueChannel">
            <int:queue />
        </int:channel>
    
    </beans>
    
  • 共有1个答案

    松和安
    2023-03-14

    消息应该在jms_correlationId上关联。打开DEBUG日志记录并比较示例和您的版本之间的消息流。可能相关id设置不正确。

    入站网关使用此逻辑。。。

    replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
    

    因此,当发送到聚合器时,与每个请求关联的消息应该获得相同的jms\u correlationId

    您的测试表明这两条消息在某种程度上具有相同的消息id。

    编辑

    到达的消息具有相同的相关ID(在本例中为头['jms_correlationId'])将被丢弃(晚到达者),除非到期-组-完成后="true"-允许新组开始而不是丢弃。您需要弄清楚为什么第二组具有与第一组相同的相关id。

     类似资料:
    • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

    • 在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我

    • 我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。

    • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

    • null 重新创建问题的测试可以在https://github.com/hawk1234/spring-integration-example commit 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4中找到。当您运行测试时,应用程序日志可以在路径build/logs/spring-integration-example.log下获得。当前测试挂起,因为网关从未

    • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?