当前位置: 首页 > 知识库问答 >
问题:

基于spring boot,如何在Redis Stream中实现多线程并行消费消息队列?

益清野
2024-08-21

使用redis stream实现消息队列,消费者在消费时每次都是同一个线程,无法并行消费!!!

创建StreamMessageListenerContainer

StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ? extends ObjectRecord<String, ?>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    .pollTimeout(Duration.ofSeconds(10)) 
                    .batchSize(10) 
                    .targetType(targetClass)
                    .executor(taskExecutor)
                    .build();

线程池配置

@Configuration
@EnableAsync
public class RedisExecutorConfig {

    @Bean("redisTaskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(20);
        taskExecutor.setMaxPoolSize(35);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("redis-container-thread-");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

}

啥原因呢,为啥这个线程池设置了没效果,每次都是同一个线程名称,有啥好的方法设置消费端并行消费呢?

        /**
         * Configure a {@link Executor} to run stream polling {@link Task}s.
         *
         * @param executor must not be null.
         * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
         */
        public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor) {

            Assert.notNull(executor, "Executor must not be null!");

            this.executor = executor;
            return this;
        }

进入public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor)方法查看源码发现注释翻译过来是配置{@link Executor}以运行流轮询{@link Task},难道是这个executor不支持多线程消费吗?

共有1个答案

张浩阔
2024-08-21

在Spring Data Redis中,使用StreamMessageListenerContainer并配置executor以支持多线程消费是正确的方法。然而,如果你发现所有的消息处理都在同一个线程上执行,这可能是由于几个原因造成的:

  1. 配置问题:确保你的StreamMessageListenerContainer确实使用了配置的executor。从你提供的代码看,你已经正确地将taskExecutor设置为了StreamMessageListenerContainerOptionsexecutor
  2. 消费者组(Consumer Group)的单一消费者实例:如果你的Redis Stream消费者组只有一个消费者实例,那么即使你配置了多线程,也只有这一个消费者实例会接收消息。Redis Stream的消费者组设计是确保消息在每个消费者组内只被处理一次。如果你想要并行处理,你需要在同一个消费者组内增加更多的消费者实例,或者在多个消费者组之间进行分布。
  3. Spring Boot版本和依赖库版本:确保你使用的Spring Boot和Spring Data Redis的版本支持多线程消息监听。通常,较新的版本会包含对多线程和并发更好的支持。
  4. 日志和调试:查看应用的日志,特别是关于Spring Data Redis和线程池的日志,以确认是否有任何异常或错误导致线程池没有按预期工作。此外,你可以在消费者方法中添加日志记录,以查看是哪个线程正在处理消息。
  5. Redis配置:检查Redis服务器的配置,确保没有限制连接数或队列的并行处理能力。
  6. 代码验证:在你的消息处理逻辑中,确保没有单线程阻塞的操作(如同步等待外部资源),这可能会阻止其他线程的执行。

解决方案

  1. 增加消费者实例:如果你是在一个Spring Boot应用中,考虑创建多个应用实例或使用Spring Cloud Stream等框架来支持多个消费者实例。
  2. 使用多个消费者组:如果你的应用可以处理这种架构,你可以将消息分发到多个消费者组,每个消费者组有自己的消费者实例和线程池。
  3. 验证和测试:确保你的配置正确无误,并进行充分的测试来验证多线程消费是否按预期工作。
  4. 查阅文档和社区:查看Spring Data Redis和Spring Boot的官方文档,以及相关的社区论坛和Stack Overflow问题,看看是否有其他人遇到了类似的问题并找到了解决方案。
  5. 更新依赖:如果可能的话,尝试更新你的Spring Boot和Spring Data Redis依赖到最新版本,以获得更好的性能和功能支持。
 类似资料:
  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 这是一个关于Kafka和信息如何被消费的非常基本的问题,但不幸的是,我在这一点上找不到任何答案。 假设我想过度分区,那么我将得到比消费者多10倍的分区。过度分区是必需的,因为我希望能够扩展(在未来并行处理更多的消息)。 1 个主题分为 1000 个分区,由 100 个使用者使用 =- 我的问题是: > 消息是如何为每个消费者消费的:它是以循环方式完成的吗?如果不是,分发是如何完成的? 有没有保证消

  • 本文向大家介绍Python中线程的MQ消息队列实现以及消息队列的优点解析,包括了Python中线程的MQ消息队列实现以及消息队列的优点解析的使用技巧和注意事项,需要的朋友参考一下 “消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相

  • 消息传递 稍加考虑,上一节的练习题其实是不完整的,它只是评分系统中的一环,一个评分系统是需要先把信息从数据库或文件中读取出来,然后才是评分,最后还需要把评分结果再保存到数据库或文件中去。如果一步一步串行地做这三个步骤,是完全没有问题的。那么我们是否可以用三个线程来分别做这三个步骤呢?上一节练习题我们已经用了一个线程来实现评分,那么我们是否也可以再用一个线程来读取成绩,再用另个线程来实现保存呢? 如

  • 需要帮助吗 我需要创建多个并行执行的sqs队列使用者,但我不知道如何使用Sprint集成来实现这一点 我有以下架构 包含20万条消息的Amazon SQS队列 一个包含5个EC2实例的Amazon堆栈,每个实例都有tomcat服务器,运行一个Spring Boot应用程序,该应用程序具有Spring集成流,该集成流使用Spring集成aws的SQS消息驱动通道适配器来消费SQS的消息(https:

  • 主要内容:1 start启动服务定时清理过期消息,1.1 cleanExpireMsg清理过期消息,1.2cleanExpiredMsg清理过期消息,2 submitConsumeRequest提交消费请求,2.2 submitConsumeRequestLater延迟提交,2.2 consumeMessageBatchMaxSize和pullBatchSize,3 ConsumeRequest执行消费任务,,,,基于RocketMQ release-4.9.3,深入的介绍了ConsumeMes