当前位置: 首页 > 知识库问答 >
问题:

骆驼路由中的生产者流控制问题(持久消息)

太叔鹏云
2023-03-14

我在查找正确的 activemq 配置集以确保 Apache 骆驼路由中的消息吞吐量一致时遇到问题。当前配置使用以下技术:

>

  • 骆驼 (2.15.2)
  • 活动磁共振 (5.12.1)
  • 雄猫 (7.0.56)

    以下是Camel中用于ActiveMQ的bean配置集:

    <bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" />
         <property name="watchTopicAdvisories" value="false" />
         <property name="producerWindowSize" value="2300" />
    </bean>
    
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="20" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="idleTimeout" value="0"/>
    </bean>
    
    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="pooledConnectionFactory"/> 
            <property name="transactionManager" ref="jmsTransactionManager"/> 
            <property name="transacted" value="true"/>
    

    --

    <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
            <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig" />
    </bean>
    

    以下是在activemq中找到的代理特定配置。xml文件:

    <broker xmlns="http://activemq.apache.org/schema/core"
            brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
            <destinationPolicy>
                <policyMap>
                    <policyEntries>
                        <policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
                        <policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
                    </policyEntries>
                </policyMap>
            </destinationPolicy>
    
            <managementContext>
                <managementContext createConnector="true" />
            </managementContext>
    
            <persistenceAdapter>
                <kahaDB directory="./activemq/kahadb/" />
            </persistenceAdapter>
    
            <systemUsage>
                <systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
                    <memoryUsage>
                        <memoryUsage limit="750 mb" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="2 gb" />
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="500 mb" />
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <transportConnectors>
    
                <transportConnector name="openwire"
                    uri="tcp://0.0.0.0:6616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
                <transportConnector name="amqp"
                    uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
            </transportConnectors>
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
        </broker>
    

    我正在运行下面的骆驼路由。队列A接收大量消息(1000/s),因此它很快就会被填满,因为这些消息的最终消费者跟不上。当消息量最终达到持久存储生产者的50%时,流控制规则会阻止更多消息被放入队列A。但是,当我通过JMX检查队列深度时,队列A和队列B都不会改变,就好像消费者也被阻止了一样。

        from(activemq:queue:PICKAXE.L5.PROC.A)
            .to(activemq:queue:PICKAXE.L5.COL.B);
    
        from(activemq:queue:PICKAXE.L5.COL.B)
            .autoStartup(!localFlag)
            .to(customEndpoint)
            .routeId(collectionRouteId);
    

    在大约一周的时间里,我尝试了jms/activemq配置的各种排列,但都没有成功,所以我会感谢任何想法。我想要的行为是让这个流中的消息消费者继续从持久存储中移除消息,这将允许消息继续完整地流动。

  • 共有1个答案

    上官羽
    2023-03-14

    该问题是由在上述配置中设置为3000000的sendFailIfNoSpaceAfterTimeout过大引起的。这导致代理在确认由于持久存储已满而导致发送()命令失败之前等待。

    上述配置已替换为以下内容:

    <systemUsage sendFailIfNoSpaceAfterTimeout="300">
    

    这确保了(由于消息是持久的,队列集成到Camel路由中),当永久存储已满导致失败时,每0.3秒重试一次send()操作。

     类似资料:
    • 以下是《行动中的骆驼》中关于生产者和消费者的定义。 使用者可以从外部服务接收消息,在某些系统上轮询消息,甚至创建消息本身。然后,该消息流经一个处理组件,该组件可以是企业集成模式(EIP)、处理器、拦截器或其他一些自定义创建。消息最终被发送到一个目标endpoint,该endpoint是生产者的角色。路由可能有许多修改消息或将其发送到另一个位置的处理组件,也可能没有,在这种情况下,它将是一个简单的管

    • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

    • 如有任何帮助,不胜感激。

    • 我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利

    • 这是我的骆驼上下文摘录:- 下面是日志中的一个片段:- 即使是也不能阻止堆栈跟踪。现在我已经发现,如果我向JMS消费者添加disableReplyTo=true,那么stacktrace就不会显示哪一个是优秀的。