当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring Boot使几个线程从RabbitMQ队列中取出?

芮化
2023-03-14

我们的应用程序使用RabbitMQ提供的几个队列中的数据。为了提高吞吐量,我们在每个队列中启动多个线程,这些线程从这些队列中进行阻塞。

对于一个新的服务,我们希望使用Spring Boot,并且每个队列都有几个线程从这些队列中获取数据。下面是用于处理从某个队列到达的数据的规范Spring Boot代码:

@StreamListener(target = Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<SomeData> process(Message<SomeData> message) {
    SomeData result = service.process(message.getPayload());
    return MessageBuilder
            .withPayload(result)
            .copyHeaders(message.getHeaders())
            .build();
}

共有1个答案

裴金鑫
2023-03-14

您可以在配置队列时为其设置并发使用者。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory
        (MessageConverter contentTypeConverter,
         SimpleRabbitListenerContainerFactoryConfigurer configurer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    // the number of consumers is set as 5
    factory.setConcurrentConsumers(5);

    configurer.configure(factory, connectionFactory);
    factory.setMessageConverter(contentTypeConverter);
    return factory;
}
 类似资料:
  • RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用。我有一个需求,其中希望从一个队列接收到多个订阅的消费者的相同消息。 下面是我的示例消费者代码。这里有两个侦听器在侦听同一个队列,但是只有一个使用者接收到消息。如何配置它,以便将相同的消息传递给两个消费者?(Consumer1和Consumer2)。任何帮助都将得到高度赞赏。

  • 我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队

  • 我花了整整一天的时间来尝试让spring-AMQP示例项目在docker版本的RabbitMQ上运行。我只是在运行标准的rabbitmq Docker。虽然我没有连接问题,但我总是得到与创建队列相关的异常,并且我已经尝试了所有可能的变体,在这一点上。 我尝试在我的配置中声明队列,就像示例项目一样。我尝试显式配置RabbitAdmin。我已经尝试显式配置整个自动配置混乱。我在rabbitmq中创建了

  • 我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,

  • 这就是事情。 我正在使用PHP AMQP从Rabbitmq读取结果队列,以便处理发送的每封电子邮件上的重要信息。完成后,我需要将该消息删除或标记为已写入,以便下次读取队列时,不会得到已处理的消息。 由于Rabbitmq服务器每小时发送超过10.000封电子邮件,每次我读取队列以处理结果发送时,脚本至少可以运行5分钟,以便处理队列中的所有消息,因此在完成后,在这5分钟内会发送数百条新消息。这使得我无

  • 如果正在运行的线程少于corePoolSize线程,则执行器宁愿添加一个新线程,而不是排队。2)如果corePoolSize或更多线程正在运行,则执行器更喜欢将请求排队,而不是添加新线程。 如果请求无法排队,将创建一个新线程,除非该线程将超过maximumPoolSize,在这种情况下,任务将被拒绝。 第一种情况是可以的,但我想要的是,当核心线程被利用时,任务不需要排队(即使在有界队列的情况下,比