我一直在使用apache kafka-clients(确切地说是2.3.1版本)库来创建kafka使用者,其中一个分区一个使用者线程是通过以下计算实现的:
计算上的使用者线程数*计算数=主题的分区数
它过去是手动伸缩的,因此当需要增加计算的数量时,在一台计算机上运行的使用者线程不会相应减少。
我们如何使用org.springframework.kafka.config.concurrentKafKalistenerContainerFactory
实现这一点。
我正在尝试使用spring kafka 2.5.8版本。该应用程序运行在具有自动伸缩功能的k8s集群上。假设我把最大和最小豆荚设置为4,所以理想情况下
4 X使用者线程数=主题的分区数
如何配置这个使用者线程数。是通过这个:
是;或者@kafkalistener
上的concurrency
属性,该属性重写工厂的并发性。
如果您在运行时更改它,它将不会生效,除非您stop()
和start()
容器。
在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。
ConsumerThread1-[topic1-0,topic2-0,topic3-0] ConsumerThread2-[topic1-1,topic2-1,topic3-1] 但是,我们希望每个主题有一个使用者线程,而不是每个分区有一个KafkaListener(或使用者线程)。例如: ConsumerThread1-[topic1-0,topic1-1] ConsumerThread2-[t
Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。
我有几个Samza工作运行所有阅读Kafka主题的消息,并为新主题编写新消息。为了发送新消息,我使用Samza内置的OutgoingMessageEnvelope。并使用MessageCollector发送新消息。它看起来是这样的:
我见过,但对于我的(简单的)用例来说,它似乎有些过头了。 我也知道,但我不想仅仅为此编写和维护代码。 我的问题是:有没有一种方法可以用kafka原生工具实现这个主题调度,而不用自己写一个Kafka-Consumer/Producer?
我已经在c中创建了kafka消费者,并创建了一个具有10个分区的主题,当我尝试使用消费者读取数据时,它仅从2个分区读取,然后说没有更多的消息。我尝试使用这两种方法,即订阅和分配,但它们都不起作用。我应该如何将所有10个分区分配给单个使用者,这是将分区分配给使用者的正确方法吗?我已经使用此存储库构建了自定义消费者 https://github.com/edenhill/librdkafka/blob