当前位置: 首页 > 知识库问答 >
问题:

用spring kafka 2.5.8版本实现每个kafka主题分区一个用户线程

宋典
2023-03-14

我一直在使用apache kafka-clients(确切地说是2.3.1版本)库来创建kafka使用者,其中一个分区一个使用者线程是通过以下计算实现的:

计算上的使用者线程数*计算数=主题的分区数

它过去是手动伸缩的,因此当需要增加计算的数量时,在一台计算机上运行的使用者线程不会相应减少。

我们如何使用org.springframework.kafka.config.concurrentKafKalistenerContainerFactory实现这一点。

我正在尝试使用spring kafka 2.5.8版本。该应用程序运行在具有自动伸缩功能的k8s集群上。假设我把最大和最小豆荚设置为4,所以理想情况下

4 X使用者线程数=主题的分区数

如何配置这个使用者线程数。是通过这个:

共有1个答案

甄阳朔
2023-03-14

是;或者@kafkalistener上的concurrency属性,该属性重写工厂的并发性。

如果您在运行时更改它,它将不会生效,除非您stop()start()容器。

 类似资料:
  • 在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。

  • ConsumerThread1-[topic1-0,topic2-0,topic3-0] ConsumerThread2-[topic1-1,topic2-1,topic3-1] 但是,我们希望每个主题有一个使用者线程,而不是每个分区有一个KafkaListener(或使用者线程)。例如: ConsumerThread1-[topic1-0,topic1-1] ConsumerThread2-[t

  • Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。

  • 我有几个Samza工作运行所有阅读Kafka主题的消息,并为新主题编写新消息。为了发送新消息,我使用Samza内置的OutgoingMessageEnvelope。并使用MessageCollector发送新消息。它看起来是这样的:

  • 我见过,但对于我的(简单的)用例来说,它似乎有些过头了。 我也知道,但我不想仅仅为此编写和维护代码。 我的问题是:有没有一种方法可以用kafka原生工具实现这个主题调度,而不用自己写一个Kafka-Consumer/Producer?

  • 我已经在c中创建了kafka消费者,并创建了一个具有10个分区的主题,当我尝试使用消费者读取数据时,它仅从2个分区读取,然后说没有更多的消息。我尝试使用这两种方法,即订阅和分配,但它们都不起作用。我应该如何将所有10个分区分配给单个使用者,这是将分区分配给使用者的正确方法吗?我已经使用此存储库构建了自定义消费者 https://github.com/edenhill/librdkafka/blob