当前位置: 首页 > 知识库问答 >
问题:

activemq慢速消费者阻止生产者,尽管producerFlowControl为false

贾实
2023-03-14

出于测试原因,我指定了low memoryusage limit low(35MB)以使问题apear更快,但实际情况是,当activemq的问题出现时,我最终需要它来删除旧消息。

我发现了一个不令人满意的解决方案,即在ActiveMQConnectionFactory中设置useasyncsEnd=true,并指定sendtimeout。这使得producer不会被阻塞,但通过这种方式,最新的消息被删除,而不是旧的消息。

最后,我说的是非持久的话题。

        <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry topic=">" producerFlowControl="false" memoryLimit="35 Mb">
               <pendingSubscriberPolicy>
                   <vmCursor />
                </pendingSubscriberPolicy>
                    <messageEvictionStrategy>
                        <oldestMessageEvictionStrategy/>
                    </messageEvictionStrategy>
                    <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="10"/>
                    </pendingMessageLimitStrategy>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>

    <systemUsage>
        <systemUsage sendFailIfNoSpace="true">
            <memoryUsage>
                <memoryUsage limit="35 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="1 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="5000 mb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>  
    <bean class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/>
    <property name="timeToLive" value="100"/>
</bean>

我传输javax.jms.ObjectMessage,大小相对较小。

我在customer Premissions中发现了这个问题我的应用程序中有许多toppics,但设法在本地复制了它,从1个线程发送不间断的消息,总是连续地发送到同一个主题。发送的消息只是一个小字符串。

我只有一个生产者,当我有一个(或更多)缓慢的消费者时,问题似乎就出现了--但一个缓慢的消费者就足够了--。如果不存在慢消费,问题就不会出现。

       <transportConnectors>
       <transportConnector name="openwire" uri="nio://0.0.0.0:33029?wireFormat.maxInactivityDuration=60000&amp;wireFormat.maxInactivityDurationInitalDelay=60000"/>
    </transportConnectors>

共有1个答案

赵禄
2023-03-14

我怎样才能重新创建这个?有多少生产者/消费者依附于这个主题?只有一个话题吗?

您的设置看起来不错,但不需要在策略中设置MemoryLimit=35MB。将其设置为与整个系统使用相同是没有意义的。其思想是,所有主题的内存限制总和将等于系统内存限制。例如,如果您有两个主题,每个主题将使用35MB(2*35==70MB),这将超过整个系统内存设置。我不认为这是你所看到的具体原因,而是要记住的事情。

另外,这是什么版本的ActiveMQ?如果您已经编写了一些测试,可以产生这个,让我知道。

 类似资料:
  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我在这里阅读了ActiveMQ文档中的以下引用:

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 这里的一些配置:非持久消费者、非持久消息、禁用的流控制、默认预取大小、优化确认=true、异步发送=true、使用jms连接ActiveMQ 例如 一个生产者、一个消费者, 生产者发送速率可以达到6k/s 但是,在这种情况下:一个生产者三个消费者, 生产者发送速率下降到4k/s 这是我的一些关键代码: 发件人类别: sendmain方法: 接收机类代码: 接收器类代码在这里隐藏了一些方法,例如创建