当前位置: 首页 > 知识库问答 >
问题:

Apache Camel+ActiveMQ。两个代理之间的消息传输太慢

傅琦
2023-03-14

我试图通过Apache Camel在两个ActiveMQ代理之间传输消息,但问题是我只能达到每秒135条消息的传输速率。我想增加这个数字。情况是远程服务器上有2个ActiveMQ代理。我想从第一个代理上的队列获取消息,并通过骆驼路由将这些消息传送到第二个代理上的多个队列。

this is how I establish connections:
CamelContext context = new DefaultCamelContext();
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.1**.6.195:62222");
            ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.1**.6.195:
            PooledConnectionFactory pf1 = new PooledConnectionFactory(connectionFactory);
            pf1.setMaximumActiveSessionPerConnection(45);
            pf1.setMaxConnections(40);
            PooledConnectionFactory pf2 = new PooledConnectionFactory(connectionFactory2);
            pf2.setMaximumActiveSessionPerConnection(45);
            pf2.setMaxConnections(40);
            context.addComponent("broker1", JmsComponent.jmsComponentAutoAcknowledge(pf1));
            context.addComponent("broker2", JmsComponent.jmsComponentAutoAcknowledge(pf2));

我的路由:

context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                onException(SetParamsException.class)
                    .filter()
                    .method(new IsDisableFlowLoggingFilter(), "filter")
                    .process(new CreateErrorHandlerLogMessageProcessor())
                    .to("broker2:queue:ESB.EVENT.LOGGING");

                from(fromBroker+":queue:"+sourceQueue+"?maxConcurrentConsumers=500&concurrentConsumers=40&asyncConsumer=true")
                    .process(new SetParamsProcessor())
                        .to("seda:EVENT.LOGGING")
                        .to("seda:EVENT.TRANSACTION.LOGGING")
                        .to("seda:EVENT.MONITOR.LOG")
                        .to("xslt:file://transform.xsl")
                            .to("broker2:queue:testMQDestinationOLOLO?maxConcurrentConsumers=500&concurrentConsumers=20&asyncConsumer=true")
                from("seda:EVENT.LOGGING")
                    .filter()
                    .method(new IsDisableFlowLoggingFilter(), "filter")
                    .process(new CreateEventMessageProcessor())
                    .to("broker2:queue:EVENT.LOGGING");

                from("seda:EVENT.TRANSACTION.LOGGING")
                    .process(new CreateTransactionDetailsMessageProcessor())
                    .to("broker2:queue:EVENT.TRANSACTION.LOGGING");

                from("seda:EVENT.MONITOR.LOG")
                    .process(new CreateMonitoringMessageProcessor())
                    .to("broker2:queue:EVENT.MONITOR.LOG");
            }
        });
        context.start();

ps:顺便说一句,ping远程服务器~2ms

共有1个答案

魏毅
2023-03-14

这是相当多的伐木正在进行,我建议减少它。否则,看起来就像是在消息上使用xslt,考虑到通常的xslt速度,这可能会相当慢。我还建议发布您的ActiveMQConnectionFactory配置。我强烈建议在具有更多会话的池上运行这些。我目前在生产中使用camel和activemq的速度非常快。下面是一个示例工厂,您可以对其进行建模:

<bean id="myAmqBean" class="org.apache.activemq.camel.component.ActiveMQComponent" destroy-method="doStop">
    <property name="configuration">
        <bean class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="concurrentConsumers" value="20" />
            <property name="maxConcurrentConsumers" value="20" />
            <property name="acceptMessagesWhileStopping" value="true" />
            <property name="connectionFactory">
                <bean class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
                    <property name="maxConnections" value="10" />
                    <property name="MaximumActiveSessionPerConnection" value="500" />
                    <property name="connectionFactory">
                        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                            <property name="brokerURL" value="${activemq1.brokerUrl}" />
                            <property name="userName" value="${activemq1.username}" />
                            <property name="password" value="${activemq1.password}" />
                            <property name="dispatchAsync" value="true" />
                            <property name="alwaysSessionAsync" value="true" />
                            <property name="useAsyncSend" value="true" />
                        </bean>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
 类似资料:
  • 我不熟悉Spring集成。 我的用例是: 侦听RabbitMQ队列/主题,获取消息,处理它,将其发送给其他消息代理(通常是另一个RabbitMQ实例)。 预期负载:5000条消息/秒 正在应用中。属性我们可以为一台主机设置配置。 如何在两个消息代理之间使用Spring集成? 我看到的所有示例都是针对一个消息代理的。任何关于两个消息代理和Spring集成的入门指南。 祝好 马赫什

  • 我在完整图形拓扑上有一个代理网络,在不同服务器上有3个节点:A,B和C。每个代理都附加了一个生产者,并且出于测试目的,在代理 C 上只有一个非代理使用者。由于我使用的是完整图拓扑,因此每个代理还为其他每个节点都有一个代理使用者。 问题是:A收到一些消息。我希望它将这些消息转发给代理C,该代理连接了一个“真实”消费者。这不会发生,代理A存储这些消息,直到一个“真实”消费者连接到它。 我的配置(或理解

  • 刚刚浏览了这些文档: < Li > http://ActiveMQ . Apache . org/failover-transport-reference . html 那么,是否有可能配置消费者和生产者应用程序来连接和平衡多个代理实例之间的负载(以循环或类似的方式)?

  • 我使用网络连接器配置了两个代理 A 和 B。如果我使用独占使用者(单个使用者)或消息组(JMXgroupID),消息顺序是否保留? 在经纪人文档的网络中,我发现: 代理网络不会保留总消息排序。总排序适用于单个使用者,但网络桥引入第二个使用者。此外,网络桥接使用者通过 producer.send(..) 转发消息,因此它们从转发代理上的队列头转到目标上队列的尾部。如果单个使用者在联网代理之间移动,则

  • 问题内容: 我是新来的消息,想知道的区别,,和 任何人都知道这些产品的不同之处吗? 提前致谢 ! 编辑: 还想知道学习这些东西的任何好地方/资源。 问题答案: ActiveMQ是一个消息代理,它实现JMS API并支持许多跨语言客户端和网络协议。它使您可以实现队列或主题,并编写侦听器或订阅者以响应队列事件。 Mule和ServiceMix是开源ESB(企业服务总线)。ESB具有JMS以外的功能:排

  • 我们需要将消息从一个ActiveMQ代理复制到另一个代理。这里消息必须只是复制,并且消息应该存在于两个代理中。 我可以想到一个自定义应用程序,它订阅某个目标并读取该消息并将消息重新发布到多个代理中的目标。 我没有权限在经纪人中进行更改,所以我想不出经纪人网络选项。 是否有任何最佳实践或工具可用于将A-MQ消息从一个代理复制到另一个代理?