当前位置: 首页 > 知识库问答 >
问题:

如何使用队列限制过程(并行)创建从kafka读取的spring集成流

羿经武
2023-03-14

我想创建一个异步读取kafka消息的流,并使用队列通道累积大量要处理的消息,并且只有在处理完这些消息(例如50条消息)后,它才能处理另外50条消息,或者在释放队列中的空间时。我尝试使用一个从kafka委托读取到另一个流的流,该流具有一个带有PollerMetadata(Pollers.fixedDelay(500))的QueueChannel。maxMessagesPerPoll(50))但是轮询器使用一个线程来读取那里的消息,我无法并行处理这50条消息,如果我在轮询器中放置一个执行器,它将像正常的执行器一样工作,它将累积消息,并且不会挂起到50,直到我有一个新线程可供他从Kafka获取另一条消息。

目标是并行处理多达50条kafka消息,但他只在这个队列释放时在kafka(consumer.pool)中再次读取,但他无限地从kafka读取并在执行器或轮询器的限制量内进行处理,我可以使用带有kafka的Spring集成流来实现这个目标吗?

每个消费者主题只有这个配置就足够了?日志总是打印相同的线程:[ntainer#0-1-C-1]即使我为并发设置了10

块引用

> Kafka.messageDrivenChannelAdapter(consumerFactory,
> topic).configureListenerContainer { kafkaMessageListenerContainer ->  
> kafkaMessageListenerContainer.concurrency(concurrency)               
> kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
> }
>                 .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)

共有1个答案

寿丰
2023-03-14

您不应该使用队列通道或使用Kafka执行任何异步处理。跟踪主题/分区内的偏移量太难了。您将面临丢失消息的风险。

相反,要增加并发性,请增加主题中的分区数,并设置侦听器容器并发性以获得所需的使用者数(例如50)。

您通常应该拥有比消费者更多的分区,但您至少需要同样多的分区,因为组中只有一个消费者可以使用分区。

 类似资料:
  • 在一个Spring Boot应用程序中,我想使用Spring集成从一个Kafka队列中读取。配置了以下内容: bean实现。 调试时,很明显,在(stacktrace的顶部)中,为空。 在Spring集成中,将通道连接到应该处理消息的bean的正确方法是什么?

  • 我有以下设置: 外部应用程序写入到jms队列 jms队列也是外部的,所以这里不可能进行配置 运行在Glassfish上的现有应用程序,使用ActiveMQ将服务器连接到jms队列 现有的应用程序是一个MessageDriven bean,实现了MessageListener接口(当消息放在队列中时,当前正在处理消息) 新的要求是消息只能在02:00到04:00的时间段内处理 如何最好地限制Mess

  • 问题内容: 区别在于消息来自Http端点而不是JMS队列。问题是由于某些原因而无法填充消息通道,或者Flux.from()不会拾取它。日志条目显示GenericMessage是从Http Integration流中创建的,并带有有效负载作为路径变量,但是没有入队/未发布到通道?我尝试过并且 没有任何区别,事件流为空。这是代码: UPDATE1: build.gradle 更新2 当和在一个文件中定

  • 问题内容: 我正在尝试使用Spring Reactor 3组件和Spring Integration从JMS队列创建反应式流(Flux)。 我正在尝试从JMS队列(使用Spring Integration的ActiveMQ)创建客户端的响应流(Spring Reactor 3 Flux),以使客户端异步获取JMS消息。我相信我已经正确连接了所有东西,但是在服务器停止之前,客户端不会收到任何JMS消

  • 需要帮助吗 我需要创建多个并行执行的sqs队列使用者,但我不知道如何使用Sprint集成来实现这一点 我有以下架构 包含20万条消息的Amazon SQS队列 一个包含5个EC2实例的Amazon堆栈,每个实例都有tomcat服务器,运行一个Spring Boot应用程序,该应用程序具有Spring集成流,该集成流使用Spring集成aws的SQS消息驱动通道适配器来消费SQS的消息(https:

  • 我正在迁移一个Kafka Streams实现,它使用纯Kafka apis来使用sping-kafka,因为它被合并在sping-引导应用程序中。 一切都很好Stream,GlobalKtable,分支,我所有的工作都非常好,但我很难合并ReadOnlyKeyValueStore。基于这里的sping-kafka留档:https://docs.spring.io/spring-kafka/docs