当前位置: 首页 > 知识库问答 >
问题:

DefaultMessageListenerContainer未缩放

缪风史
2023-03-14

我有一个DefaultMessageListenerContainer,它(在我看来)没有扩展。容器被定义为侦听队列,其中有100条消息。

我希望容器可以达到任何长度,消息将尽可能快地被使用(通过观察maxConcurrentConsumers配置)。所以我假设有7个ConcurrentConsumer。(在container-startup时由2个ConcurrentConsumer开始)一些日志记录信息:

activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7

我的Spring-config(它的一部分):

<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
    <property name="connectionFactory" ref="jmscfCee" />
    <property name="maxConcurrentConsumers" value="7"/>
    <property name="receiveTimeout" value="100000" />
    <property name="concurrentConsumers" value="2" />
</bean>

<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
    <property name="destinationName" value="MY.QUEUE" />
    <property name="messageListener" ref="myMessageListener" />
</bean>

<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>
public class LoggingListenerContainer extends DefaultMessageListenerContainer{

private static final Logger logger = Logger
        .getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
        throws JMSException {

    logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
    logger.info("concurrentConsumers: " +  this.getConcurrentConsumers());
    logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
    logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
    logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
    logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
    super.doInvokeListener(listener, message);
}

我的侦听器类:

public class ListenerClass implements MessageListener {


    public void onMessage(Message msg) {
           //Do some business function
    }

}

能不能有人这么好心来纠正我的配置,或者给我一些关于我的配置的提示,或者解释我的容器的方法?(如果我误解了什么)

我正在本地测试ActiveMQ(在WebSphere MQ的生产中)-如果它与可伸缩性主题相关的话。

<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${jmscfCee.hostName}</value>
        </property>
</bean>

<bean id="jmscfCeeCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory ">
    <constructor-arg ref="jmscfCee" />
    <property name="sessionCacheSize" value="10" />
</bean>

共有1个答案

鲁英卫
2023-03-14

看情况.几年前我在ActiveMQ上遇到过类似的问题,它的默认行为被大量复制到大量(数千条)小消息上。默认情况下,每个使用者将以1000个批量预取消息,因此,如果您有少量消息,您可能会发现它们都已结束在一个使用者的预取缓冲区中,而其他使用者则处于空闲状态。

您可以在连接URI上或在Spring配置中使用预取策略来调整这种行为,如果这是您构建连接工厂的方式的话。

<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
  <property name="prefetchPolicy">
    <amq:prefetchPolicy all="1" />
  </property>
</amq:connectionFactory>

我当时使用的ActiveMQ版本不支持0的预取限制(即不要预取,每次只需转到代理),但文档提示现在允许这样做。

 类似资料:
  • 我在Redhat ActiveMQ中有两个队列,一个用于消费,另一个用于生成和消费对象消息。 一旦从主队列中消耗,它会被推到第二队列进行进一步处理,但是当使用JmsTemboard时,消息会随机丢失, 我在DMLC容器2和JmsTemplate的ActiveMQConnectionFactory中使用相同的Bean 让我知道如何确保消息不会在JmsTemplate中丢失。

  • 它是工作的,因为我的listener类很好地接收消息,但日志看起来很奇怪。开始后,看起来只有一个线程在处理消息,即使队列中有几个线程在等待: 正如您所看到的,最终只有容器#5用于处理所有剩余的消息。消息不是并行处理的,定义的并发性似乎没有被使用。我不知道这是不是完全相同的问题,但我看过这篇文章,但我没有使用ActiveMQ,我没有这个预取选项。 你能解释一下我为什么会有这种行为吗。是弹簧配置错误还

  • 在我正在开发的应用程序中,我使用了Spring JMS DefaultMessageListenerContainer和作为SessionAwareMessageListener的JMS使用者。还有一个XA transactionManager,在JMS和JDBC之间共享。作为JMS提供者,我使用WebLogic。 我注意到,每次消费者收到一条消息时,JMS会话都与之前消息中使用的会话完全不同:

  • 我使用DefaultMessageListenerContainer来消费ActiveMQ队列中的消息,如下所示。在这种实现中,是否有轮询机制,侦听器是否每隔1秒左右轮询队列以查看是否有新消息,或者是否在队列中有新消息时调用onMessage方法?如果使用轮询,我们如何增加或减少轮询频率(时间)。

  • 我正在尝试使用ffmpeg和使用以下命令压缩一个视频。 然而,生成的视频的大小大于原始视频的大小。有没有人可以指出为什么会发生这种情况,如果有其他命令我应该使用。

  • 当我从使用CombinedDomainXYPlot的图表中隐藏系列的数量时,所有的范围轴都可以很好地自动重新缩放。但是,域轴不会重新缩放。是否有任何方法手动刷新缩放,或者可能有一个设置,我错过了启用自动缩放域轴在此设置?