使用redis stream实现消息队列,消费者在消费时每次都是同一个线程,无法并行消费!!!
创建StreamMessageListenerContainer
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ? extends ObjectRecord<String, ?>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(10))
.batchSize(10)
.targetType(targetClass)
.executor(taskExecutor)
.build();
线程池配置
@Configuration
@EnableAsync
public class RedisExecutorConfig {
@Bean("redisTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(35);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("redis-container-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}
啥原因呢,为啥这个线程池设置了没效果,每次都是同一个线程名称,有啥好的方法设置消费端并行消费呢?
/**
* Configure a {@link Executor} to run stream polling {@link Task}s.
*
* @param executor must not be null.
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
*/
public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor) {
Assert.notNull(executor, "Executor must not be null!");
this.executor = executor;
return this;
}
进入public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor)方法查看源码发现注释翻译过来是配置{@link Executor}以运行流轮询{@link Task},难道是这个executor不支持多线程消费吗?
在Spring Data Redis中,使用StreamMessageListenerContainer
并配置executor
以支持多线程消费是正确的方法。然而,如果你发现所有的消息处理都在同一个线程上执行,这可能是由于几个原因造成的:
StreamMessageListenerContainer
确实使用了配置的executor
。从你提供的代码看,你已经正确地将taskExecutor
设置为了StreamMessageListenerContainerOptions
的executor
。我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨
这是一个关于Kafka和信息如何被消费的非常基本的问题,但不幸的是,我在这一点上找不到任何答案。 假设我想过度分区,那么我将得到比消费者多10倍的分区。过度分区是必需的,因为我希望能够扩展(在未来并行处理更多的消息)。 1 个主题分为 1000 个分区,由 100 个使用者使用 =- 我的问题是: > 消息是如何为每个消费者消费的:它是以循环方式完成的吗?如果不是,分发是如何完成的? 有没有保证消
本文向大家介绍Python中线程的MQ消息队列实现以及消息队列的优点解析,包括了Python中线程的MQ消息队列实现以及消息队列的优点解析的使用技巧和注意事项,需要的朋友参考一下 “消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相
消息传递 稍加考虑,上一节的练习题其实是不完整的,它只是评分系统中的一环,一个评分系统是需要先把信息从数据库或文件中读取出来,然后才是评分,最后还需要把评分结果再保存到数据库或文件中去。如果一步一步串行地做这三个步骤,是完全没有问题的。那么我们是否可以用三个线程来分别做这三个步骤呢?上一节练习题我们已经用了一个线程来实现评分,那么我们是否也可以再用一个线程来读取成绩,再用另个线程来实现保存呢? 如
需要帮助吗 我需要创建多个并行执行的sqs队列使用者,但我不知道如何使用Sprint集成来实现这一点 我有以下架构 包含20万条消息的Amazon SQS队列 一个包含5个EC2实例的Amazon堆栈,每个实例都有tomcat服务器,运行一个Spring Boot应用程序,该应用程序具有Spring集成流,该集成流使用Spring集成aws的SQS消息驱动通道适配器来消费SQS的消息(https:
主要内容:1 start启动服务定时清理过期消息,1.1 cleanExpireMsg清理过期消息,1.2cleanExpiredMsg清理过期消息,2 submitConsumeRequest提交消费请求,2.2 submitConsumeRequestLater延迟提交,2.2 consumeMessageBatchMaxSize和pullBatchSize,3 ConsumeRequest执行消费任务,,,,基于RocketMQ release-4.9.3,深入的介绍了ConsumeMes