当前位置: 首页 > 知识库问答 >
问题:

骆驼分割和聚合失败,因为消息将发送到多个并发消费者

靳举
2023-03-14

我有一个简单的驼峰路由,它接受一个项目列表,将它们拆分,将每个元素发送到mq节点进行处理,然后通过聚合器将它们连接在一起。

非常接近合成消息处理器:http://camel.apache.org/composed-message-processor.html

但是我们注意到拆分后,camel会创建多个并发消费者?或者交换?因为消息被发送给多个消费者,他们永远不会完成。

列表:1,2,3,4

拆分:amq::处理每个项目

骨料:

[Camel (camel-3) thread #41 - Aggregating 1 - Waiting on 3 more items
[Camel (camel-1) thread #16 - Aggregating 2 - Waiting on 3 more items
[Camel (camel-3) thread #49 - Aggregating 3 - Waiting on 2 more items
[Camel (camel-1) thread #15 - Aggregating 4 - Waiting on 2 more items

因此,camel生成了2个聚合器,每个聚合器都在等待4个项目,但每个聚合器只能获得两个。

骆驼路线:

<camelContext xmlns="http://camel.apache.org/schema/spring">

        <route> <!-- This route splits the reg request into it's items. Adding needed info to the message header.  -->
            <from uri="activemq:registration.splitByItemQueue" />  <!-- pick up the reg req -->
            <setHeader headerName="regReqId"> <!-- Need to store the Reg Req in the header  -->
                <simple>${body.registrationRequest.id}</simple>
            </setHeader>

            <split parallelProcessing="false" strategyRef="groupedExchangeAggregator"> <!-- Split the RegRequestInfo into it's individual requestItems (add, drop, etc) -->
                <method ref="requestSplitter"  method="split" />   <!-- does the actual splitting -->
                <setHeader headerName="JMSXGroupID"> <!-- This is CRITICAL. It is how we ensure valid seat check counts without db locking -->
                    <simple>FOID=${body.formatOfferingId}</simple>  <!-- grouping on the foid -->
                </setHeader>
                <to uri="activemq:registration.lprActionQueue"/> <!-- send to queue's for processing-->
            </split>
        </route>

        <route>    <!-- performs the registration + seat check -->
            <from uri="activemq:registration.lprActionQueue" />

            <bean ref="actionProcessor" method="process"/> <!-- go to the java code that makes all the decisions -->
            <to uri="activemq:registration.regReqItemJoinQueue"/> <!-- send to join queue's for final processing-->
        </route>

        <route>    <!-- This route joins items from the reg req item split. Once all items have completed, update state-->
            <from uri="activemq:registration.regReqItemJoinQueue" />  <!-- Every Reg Req Item will come here-->
            <aggregate strategyRef="groupedExchangeAggregator" ignoreInvalidCorrelationKeys="false" completionFromBatchConsumer="true"> <!-- take all the Reg Req Items an join them to their req -->
                <correlationExpression>
                    <header>regReqId</header> <!-- correlate on the regReqId we stored in the header -->
                </correlationExpression>

                <bean ref="actionProcessor" method="updateRegistrationRequestStatus"/> <!-- update status -->                   
            </aggregate>
        </route>
</camelContext>

<bean id="groupedExchangeAggregator" class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy" />

在我的本地机器上,上述工作正常,但是当我们部署到我们的测试服务器时,一半的消息转到一个骆驼聚合器,一半转到另一个。导致没有一个完成。请注意,在下面的配置中,我们已将骆驼的并发消费者设置为1。

这是camel/activemq配置

<amq:broker useJmx="false" persistent="false">
        <amq:plugins>
            <amq:statisticsBrokerPlugin />
        </amq:plugins>
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:0" />
        </amq:transportConnectors>
    </amq:broker>

    <!-- Basic AMQ connection factory -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://localhost" />

    <!-- Wraps the AMQ connection factory in Spring's caching (ie: pooled) factory
         From the AMQ "Spring Support"-page: "You can use the PooledConnectionFactory for efficient pooling... or you
         can use the Spring JMS CachingConnectionFactory to achieve the same effect."
         See "Consuming JMS from inside Spring" at http://activemq.apache.org/spring-support.html
         Also see http://codedependents.com/2010/07/14/connectionfactories-and-caching-with-spring-and-activemq/

         Note: there are pros/cons to using Spring's caching factory vs Apache's PooledConnectionFactory; but, until
         we have more explicit reasons to favor one over the other, Spring's is less tightly-coupled to a specific
         AMQP-implementation.
         See http://stackoverflow.com/a/19594974
    -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
    </bean>

    <bean id="jmsConfig"
          class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="activemq"
          class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

共有1个答案

夏祺然
2023-03-14

原来我们有另一个Spring上下文/servlet导入我们的配置。我们相信这是问题所在。

 类似资料:
  • 我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码

  • 我已经和ApacheCamel合作了一段时间,做了一些基本的工作,但现在我正在尝试创建一个路由,在该路由中,我可以让多个“消费者”访问同一条路由,或者在路由中添加一个消费者,然后处理消息。 我的想法是拥有一个由事件触发的事件驱动消费者,然后例如从ftp读取文件。我正计划做这样的事情: 所以这个想法是我有一个事件(例如直接或来自消息队列),它具有“fileName”属性,然后使用该属性从ftp下载/

  • 主要内容:1 start启动服务定时清理过期消息,1.1 cleanExpireMsg清理过期消息,1.2cleanExpiredMsg清理过期消息,2 submitConsumeRequest提交消费请求,2.2 submitConsumeRequestLater延迟提交,2.2 consumeMessageBatchMaxSize和pullBatchSize,3 ConsumeRequest执行消费任务,,,,基于RocketMQ release-4.9.3,深入的介绍了ConsumeMes

  • 我在Camel中有一个路由,当异常发生时,我想重试,但是我想设置一个属性,以便该路由在第二次尝试时可以做一些稍微不同的事情,以阻止重试中再次发生错误。这里有一个路线说明了我目前正在尝试的想法。 显然这不是真正的路线;整个主体只是在某些情况下模拟我的组件错误。我希望看到以下消息被记录: 但我真正看到的是: 我尝试将添加到异常处理程序中,但这只会抑制错误消息。我没有看到第二条开始或完成日志消息。 为什

  • 在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制

  • 我使用的是Camel 2.16.1。关闭后,骆驼的消费者仍然接受新消息。有没有办法迫使消费者立即停止消费。这里也有同样的问题:驼峰关机策略:飞行中的信息不会减少 我为这个问题创建了一个测试用例: 运行测试用例时,我们可以看到机上交换的数量在开始优雅关闭后增加: