当前位置: 首页 > 知识库问答 >
问题:

如何使Spring JMSListener突发到最大并发线程数?

宋智明
2023-03-14

我有一个使用ActiveMQ5.10版的Spring JMS应用程序。我正在执行一个简单的并发测试。我正在使用Spring Boot、当前版本和注释来配置JMSListener和消息生产者。

消息生成器只是尽可能快地在队列中抛出消息。消息侦听器将消息从队列中取出,但在获得消息后会Hibernate1秒--模拟消息侦听器在获得消息后需要做的一些工作。

我将JMSListener设置为100-1000个并发线程。如果我同时启动消息生产者和消费者(两者都运行在各自的JVM中),那么消费者永远不会超过最小配置线程,即使最大范围设置为1000。

如果我让生产者首先启动并在队列中放置几千条消息,然后启动消费者的一个或多个实例,它将稳定地增加线程,从每秒100个线程开始,然后20个线程左右,直到队列中有大约20-30条正在运行的消息。它从不捕获生产者--队列中总是有一些消息,即使消费者不接近它的maxConcurrency计数。

@Component
public class ClientServiceImpl implements ClientService {

    private static final String QUEUE="message.test.queue";

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void submitMessage(ImportantMessage importantMessage) {

        System.out.println("*** Sending " + importantMessage);
        jmsTemplate.convertAndSend(QUEUE, importantMessage);

    }
}
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(AmqConsumerApplication.class, args);
    }
    @Value("${JMSHost}")
    private String JMS_BROKER_URL;

    @Autowired
    static Command command;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
        ((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
        ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
        ((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
        return factory;
    }

}
@Component
public class TransformationListener {

    private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";

    @JmsListener(destination=QUEUE, concurrency = "100-1000")
    public void handleRequest(ImportantMessage importantMessage) {
        System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

共有1个答案

穆远
2023-03-14

你还在面对这种行为吗?您是否在http://activemq.apache.org/what-is-the-prefetch-limit-for.html上阅读了这篇“pooled consumer and prefetch”的建议,您是否尝试使用prefetchsize=0或1?我想1能解决你的问题。如果prefetchSize大于1,可能需要将AbortSlowAckConsumerStrategy降低到低于默认的30s。在您的情况下,要有超过100个线程消耗消息,需要队列中超过1000个未消耗和未预取的消息,因为prefetchSize是10。

 类似资料:
  • 本文向大家介绍Java多线程并发编程 并发三大要素,包括了Java多线程并发编程 并发三大要素的使用技巧和注意事项,需要的朋友参考一下 一、原子性 原子,一个不可再被分割的颗粒。原子性,指的是一个或多个不能再被分割的操作。 int i = 1; // 原子操作 i++; // 非原子操作,从主内存读取 i 到线程工作内存,进行 +1,再把 i 写到朱内存。 虽然读取和写入都是原子操作,但合起来就不

  • 并发是什么?引用Rob Pike的经典描述: 并发是同一时间应对多件事情的能力 其实在我们身边就有很多并发的事情,比如一边上课,一边发短信;一边给小孩喂奶,一边看电视,只要你细心留意,就会发现许多类似的事。相应地,在软件的世界里,我们也会发现这样的事,比如一边写博客,一边听音乐;一边看网页,一边下载软件等等。显而易见这样会节约不少时间,干更多的事。然而一开始计算机系统并不能同时处理两件事,这明显满

  • 因为Websockets构建在TCP之上,所以我的理解是,除非端口在连接之间共享,否则您将受到64K端口限制的约束。但我也看到过使用Gretty进行512K连接的报告。所以我不知道。

  • 在大部分现代操作系统中,执行中程序的代码运行于一个 进程(process)中,操作系统则负责管理多个进程。在程序内部,也可以拥有多个同时运行的独立部分。这个运行这些独立部分的功能被称为 线程(threads)。 将程序中的计算拆分进多个线程可以改善性能,因为程序可以同时进行多个任务,不过这也会增加复杂性。因为线程是同时运行的,所以无法预先保证不同线程中的代码的执行顺序。这会导致诸如此类的问题: 竞

  • 本章讲解 Rust 中,并发,并行,多线程编程的相关知识。

  • 问题内容: 我目前正在尝试使用Go进行一些实验。这是我正在尝试做的事情: 我有一个REST API服务正在运行,我想在尽可能多的Goroutine中反复查询特定的URL,以查看这些响应的性能如何(通过查看我的REST API服务器日志)。在退出程序之前,我想发送总计100万个HTTP请求-在计算机允许的范围内同时执行。 我知道有一些工具可以做到这一点,但是我主要对如何使用goroutines在Go