当前位置: 首页 > 知识库问答 >
问题:

Spring JMS侦听器-容器并发属性不工作

佘飞鸣
2023-03-14

大家好,我正在用ActiveMQ学习Spring JMS。在我的示例场景中,生产者应用程序在队列中发送大约50条消息,当我启动消费者应用程序时,它开始使用这些消息。

为每个侦听器启动的并发会话/使用者的数量。可以是表示最大值的简单数字(例如“5”),也可以是表示下限和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,可能会在运行时被忽略。默认值为1;如果有主题侦听器或队列排序很重要,则将并发限制为1;请考虑为一般队列引发它。

但在我的配置中,我将该属性设置为5,但似乎无法启动5个并发侦听器。

侦听器的配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" />

    <bean id="listener" class="com.jms.example.MyMessageListener"></bean>

    <jms:listener-container container-type="default" concurrency="5"
        connection-factory="connectionFactory">
        <jms:listener destination="MyQueue" ref="listener"
            method="onMessage"></jms:listener>
    </jms:listener-container>

</beans>
<bean id="msgListenerContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"
        p:connectionFactory-ref="connectionFactory"
        p:destination-ref="destination"
        p:messageListener-ref="listener"
        p:concurrentConsumers="10"
        p:maxConcurrentConsumers="50" />

编辑:

消费者代码:

public class MyMessageListener implements MessageListener{


    public void onMessage(Message m) {
        TextMessage message=(TextMessage)m;
        try{
            System.out.println("Start = " + message.getText());
            Thread.sleep(5000);
            System.out.println("End = " + message.getText());
        }catch (Exception e) {e.printStackTrace();  }
    }
}

我正在控制台上打印已消耗的消息,其输出将在下面的方案中解释:

    null

这里的问题是它没有加载所有的10个消费者。有时它装载3或1。

Start = hello jms 1 // consumer 1 started 
Start = hello jms 2 // consumer 2 started 
Start = hello jms 3 // consumer 3 started 
End = hello jms 1  //  consumer 1 ended
Start = hello jms 4 // consumer 4 started and hence always 3 consumers and not 10
End = hello jms 2
Start = hello jms 5
End = hello jms 3
Start = hello jms 6

情景2:

  1. 启动生产者并发送消息(同时使用者正在运行)
  2. 由于使用者处于运行状态,它将开始使用它们。
Start = hello jms 1 // consumer 1 started 
Start = hello jms 2 // consumer 2 started 
Start = hello jms 3 // consumer 3 started 
Start = hello jms 4 // consumer 4 started 
Start = hello jms 5 // consumer 5 started 
Start = hello jms 6 // consumer 6 started 
Start = hello jms 7 // consumer 7 started 
Start = hello jms 8 // consumer 8 started 
Start = hello jms 9 // consumer 9 started 
Start = hello jms 10 // consumer 10 started. Hence all them started at same time as expected.
End = hello jms 1
Start = hello jms 11
End = hello jms 2
Start = hello jms 12
End = hello jms 3
Start = hello jms 13

请帮帮忙。

共有1个答案

佘俊茂
2023-03-14

就像Strelok告诉我的关于预取信息的事情一样。创建了PrefetchPolicybean,其中QueuePrefetch属性设置为1。其引用在ConnectionFactory中设置。

我在配置上做了一些改动,这些改动如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"
        p:prefetchPolicy-ref="prefetchPolicy" />

    <bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"
        p:queuePrefetch="1" />

    <bean id="listener" class="com.javatpoint.MyMessageListener"></bean>

    <jms:listener-container concurrency="10-15" connection-factory="connectionFactory">
        <jms:listener destination="javatpointQueue" ref="listener"
            method="onMessage"></jms:listener>
    </jms:listener-container>

    <!-- The JMS destination -->

      <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="javatpointQueue" />
      </bean>
</beans>
 类似资料:
  • 我有一个简单的HibernateInterceptor,基本上我想自动设置几个字段。此拦截器(如下所示)扩展了EmptyInterceptor: 我使用spring配置文件进行布线,如下所示: 但是,永远无法到达拦截器。有人有什么线索吗?我还尝试将以下内容添加到事务管理器bean定义中,如下所示:

  • 如果我向JavaFx属性添加onChange侦听器, 监听器是按顺序调用的吗?如果我有一个字符串属性,然后我把字符串转到“爱丽丝”,然后转到“鲍勃”,我保证在看到“鲍勃”之前看到“爱丽丝”吗?事实上,我不在乎我是否看到“爱丽丝”,只要我看不到“鲍勃”之后 监听器是按顺序调用的吗?我的监听器有可能同时被“爱丽丝”和“鲍勃”通知调用吗?

  • 我已经使用Spring Kafka创建了一个Kafka消费者,并将其部署在云铸造中。该主题有10个分区。我计划将应用程序扩展到10个实例,以便每个实例可以使用来自一个分区的消息。Spring Kafka支持并发消息侦听器容器,我猜它支持从每个分区创建多个线程来使用。例如,如果我有5个消费者实例,每个消费者实例可能有2个线程从分区消耗。因为我计划为每个分区创建一个应用实例,所以使用并发消费者有什么好

  • 问题内容: 我的容器XML配置: 而仅仅是一个类 我已经在XML中指定了。这是什么意思 完全相同 ? 我找到了一些文档。他们没有那么有帮助的陈述: 指定要创建的并发使用者数。默认值为1。 我感兴趣的是是否必须线程安全,即 是否创建了许多实例或许多线程使用了单个实例? 如何访问不同步的实例字段? 被 实例化一次或为每个线程/实例? 确实需要是线程安全的? 问题答案: 是的,要使用并发,您的侦听器必须

  • 我正在使用。每当单元格选择值更改时,我都想收到通知。但是我的代码不起作用。我正在使用。当我更改表单元格的值时,什么也不会发生。 当我更改表格单元格的值时,什么都没有发生。

  • 问题内容: 有人知道在python中跟踪字典对象更改的任何简便方法吗?我的工作水平很高,所以我有一些方法可以处理更改字典的操作,如果字典发生更改,我想调用一个函数来基本上执行Observer / Notify。 我要避免的是所有跟踪(设置布尔值)代码。希望有一种更轻松的方式来跟踪更改。这是一个简单的情况,但是可能存在更复杂的逻辑,这将导致我不得不设置更改的标志。 问题答案: 您可以从该类派生并在任