我创建了一个基于spring,jms和activemq的简单的生产者消费者模拟,我试图从生产者和消费者双方获得高性能,
连接设置:
<tx:annotation-driven />
<bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="failover:(tcp://${broker.url}:61616)" />
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
</bean>
<amq:queue id="queue" physicalName="queue" />
<beans:bean id="jsonMessageConverter" class="XXXXX.converter.JsonMessageConverter" />
消费者设置:
<jms:listener-container concurrency="10"
acknowledge="auto" prefetch="1" message-converter="jsonMessageConverter" transaction-manager="transactionManager"
>
<jms:listener id="queueListener_1" destination="ooIntegrationQueue"
ref="myMessageListenerAdapter" />
</jms:listener-container>
<beans:bean id="myMessageListenerAdapter"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter" >
<beans:property name="delegate" ref="consumer"/>
</beans:bean>
<beans:bean id="consumer" class="XXX.ConsumerImpl"/>
生产者设置:
<beans:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory" p:messageConverter-ref="jsonMessageConverter"
p:defaultDestination-ref="ooIntegrationQueue" p:sessionTransacted="true" />
从消费者开始,我设法每秒消耗大约25条消息,这非常慢,我发现瓶颈是我正在使用事务,在谷歌搜索了一段时间之后,并使用了配置,我发现自动装配DefaultMessageListenerContainer并将缓存级别更改为
listenerContainer.setCacheLevelName("CACHE_SESSION")
我的性能提高到每秒约1500条消息,同时仍保持事务。
我的问题现在是生产者,仍然停留在每秒约25次操作,我的生产者测试很简单:
int numOfMessages = getNumberOfMessages();
double startTime = System.currentTimeMillis();
for (int i = 1; i <= numOfMessages; i++) {
jmsTemplate.convertAndSend("HelloWorld" + i);
}
double endTime = System.currentTimeMillis();
double totalTime=(endTime-startTime)/1000;
System.out.println("Time - "+totalTime+" seconds");
System.out.println("EPS - "+numOfMessages/totalTime);
我想知道如何与制作人达到类似的性能,因为它现在成为整个系统的瓶颈。
抱歉,这个答案来不及为原始海报提供帮助。我最近调查了JmsTemplate
性能。即使使用相同的交付和确认模式,本地JMS
代码似乎也比快得多JmsTemplate
。问题原来是ActiveMQ
通常默认为异步发送,但是当您使用JmsTemplate
它时,使用的是同步发送。这会大大降低性能。您可以将ActiveMQConnectionFactory
的useAsyncSend
属性设置true
为强制异步发送。此处有更多详细信息:JmsTemplate不是邪恶的
问题内容: 我想看看使用多线程生产者而不是单线程生产者会有多少时间差异。我在本地计算机上设置了ActiveMQ,编写了生产者类,该类将初始化并在其构造函数中启动JMS连接。我将消息限制设置为3M,将所有消息推送到ActiveMQ大约花费了50秒。我只发送了一个字符串“ hello world” 3M次。 然后,我使用了相同的生产者对象(一个连接但有多个会话),并使用线程大小为8的ExecutorS
您好,我的基本要求是有一个可以发送消息的路由,并将其放在JMS队列中。camel上下文在JavaEE 6容器中运行,即JBoss AS 7.1.1,因此它附带了HornetQ for JMS;我通过引导单例启动上下文,但我不使用camel cdi。到目前为止,我一直在使用camel-jms组件,但现在我希望尽可能迁移到camel-sjms,因为springless。 我的问题是:在这个JavaEE
我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢
问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消