当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ并发问题-多个使用者从队列中使用同一消息

尉迟默
2023-03-14

我使用Spring JMS和ActiveMQ,其中有一个客户机将消息推送到队列,有多个使用者线程监听并从队列中删除消息。有些时候,相同的消息会被两个使用者从队列中出列。我不希望这种行为,并希望确保仅有的一条消息由一个消费者线程处理。你知道我哪里出了问题吗?

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
    <context:annotation-config />
    <context:component-scan base-package="com.myapp" />

    <!-- JMS ConnectionFactory config Starts -->
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${brokerURL}</value>
        </property>
        <property name="userName" value="${username}" />
        <property name="password" value="${password}" />
    </bean>

    <bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <!-- JMS ConnectionFactory config Ends -->

    <!-- JMS Template config Starts -->
    <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${activemq.consumer.destinationName}" />
    </bean>

    <bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
    </bean>
    <!-- JMS Template config Ends -->

    <!-- JMS Listener config starts -->
    <bean id="simpleMessageConverter"
        class="org.springframework.jms.support.converter.SimpleMessageConverter" />

    <bean id="myContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers" value="${threadcount}" />
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
        <property name="destination" ref="myQueue" />
        <property name="messageListener" ref="myListener" />
        <property name="messageSelector" value="JMSType = 'New'" />
    </bean>

    <bean id="myListener"
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <bean class="myapp.MessageListener" />
        </constructor-arg>
        <property name="defaultListenerMethod" value="receive" />
        <property name="messageConverter" ref="simpleMessageConverter" />
    </bean>
    <!-- JMS Listener config Ends -->


    <!-- enable the configuration of transactional behavior based on annotations -->
    <bean id="myJMSMessageSender" class="myapp.JMSMessageSender">
        <property name="jmsTemplate" ref="myQueueTemplate" />
        <property name="jmsQueue" ref="myQueue" />
        <property name="messageConverter" ref="simpleMessageConverter" />
    </bean>


    <bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
    </bean>

</beans>

ActiveMQ 5.9.1配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="instance8161" dataDirectory="${activemq.data}" persistent="false">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic="&gt;">
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        ... <!-- rest is default ActiveMQ Config -->
</broker>

共有1个答案

贲骏喆
2023-03-14

最有可能的是,您的myapp.messageListener(或其依赖项之一)不是线程安全的,并且您看到了使用者线程之间的串扰。

最佳实践是将您的侦听器创建为无状态(类中没有突变的字段)。如果这不可能,您需要使用锁来保护共享变量。

 类似资料:
  • ActiveMQ消息组是跨多个使用者进行负载平衡的一个非常好的特性。简而言之:消息流根据消息中嵌入的组标识符()在单个队列的多个使用者之间进行分区。(因此,使用者1将获得的所有消息,使用者2将获得的所有消息,依此类推) 现在,假设您有两个队列:和,并假设在流经这两个队列的消息中使用一致的S分类法。代理为on queue选择的使用者是否与代理为on queue选择的连接相同? 但是,我们能模拟这种行

  • 我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。

  • RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用。我有一个需求,其中希望从一个队列接收到多个订阅的消费者的相同消息。 下面是我的示例消费者代码。这里有两个侦听器在侦听同一个队列,但是只有一个使用者接收到消息。如何配置它,以便将相同的消息传递给两个消费者?(Consumer1和Consumer2)。任何帮助都将得到高度赞赏。

  • 我是ActiveMQ新手。我曾尝试在activemq中实现生产者-消费者(发送者-接收器)。在我的代码中,我很容易发送 这是我的制片人 MsgProducer。Java语言 MsgConsumer.java 有谁能帮我找出向多个消费者发送信息的方法吗。提前谢谢。

  • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

  • 当接收到单个队列使用者和多个生产者时,是否有可能出现消息顺序问题?在发布消息在发布消息。连接到被配置为单个使用者的队列的客户端代码应该按照和的顺序接收消息,正确吗?有时信息的接收顺序是错误的。版本为ActiveMQ Artemis 2.17.0。尽管我提到了多个生产者,消息还是使用属性从同一线程陆续发布。 我在每个消息发布时创建并关闭生产者。在相同的JVM上,我的假设是队列中发布消息的顺序,来自同