当前位置: 首页 > 知识库问答 >
问题:

Kafka-Streams:为什么所有分区都分配给ConsumerGroup中的同一个使用者?

湛光明
2023-03-14

多台机器生成事件。这些事件被发送到我们的Kafka集群,其中每台机器都有自己的主题(app.machine-events.machine-name)。因为顺序在每台机器的基础上很重要,而分区大小现在不是问题,所以所有主题都由一个分区组成。因此,目前,N个主题也意味着N个分区。

消费/处理应用程序使用了kafka-streams,我们给出了streamsconfig.application_id_config/“application.id”“machine-event-processor”,它对每个实例都是相同的,这意味着它们会被放入Kafka的同一个使用者组中。这个使用者订阅了app.machine-events.*模式,至于处理器,它处理哪台机器的事件并不重要。通过./kafka-consumer-groups.sh--bootstrap-server localhost:9092-description-group machine-event-processor--members--verbose验证了这一点,它向我显示了一个与正在运行的所有处理服务的&IP数量匹配的列表。

预期

给定20台机器和5个处理器实例,我们希望每个处理器处理4个分区(因此处理4个主题)。

实际上

一个处理器处理20个分区(因此20个主题),其他4个处理器什么都不做/空闲。杀死“幸运”处理器,所有20个分区都被重新平衡到另一个处理器,导致新处理器处理20个分区/主题,并且3个处理器空闲。

到目前为止我所尝试的

  • 检查partition.grouper。我觉得我不是完全理解它,但就我所能找到的,反正只有DefaultPartitioner选项,编写一个自定义选项应该是不必要的,因为(根据文档)这个设置应该可以工作。它确实提到了分区会根据它们的分区键(对于我们来说都是0,因为每个主题只有一个分区)加入到任务中,但我不能完全理解这一部分。
  • 已将RoundRobinAssignor用于使用者:设置.put(StreamsConfig.ConsumerPrefix(ConsumerConfig.Partition_Assignment_Strategy_Config),new RoundRobinAssignor().GetClass.GetName)(尝试了几个值,因为似乎没有任何变化。)
  • 检查其他配置属性,看看是否遗漏了什么:我想没有。

简化的代码

val streamConfig = new Properties
// {producer.metadata.max.age.ms=5000, consumer.metadata.max.age.ms=5000, default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde, consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, bootstrap.servers=kafka:9092, application.id=machine-event-processor, default.value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde}
val builder: StreamsBuilder = new StreamsBuilder
val topicStream: KStream[String, Array[Byte]] = builder.stream(Pattern.compile("app.machine-events.*"))
topicStream.process(new MessageProcessorSupplier(context)) // The event is delegated to a processor, doing the actual processing logic
val eventStreams = new KafkaStreams(builder.build(), streamConfig)
eventStreams.start()

备注

>

  • 正在使用Kafka-Streams 2.0.0:

    org.apache.kafka kafka-streams 2.0.0

    Kafka正在容器中运行,使用Wurstmeister/Kafka:2.11-2.0.0版本。docker-stack.yml服务:

    <代码>kafka:image:wurstmeister/kafka:2.11-2.0.0端口:-目标:9094发布:9094协议:tcp模式:主机卷:-/var/run/docker.sock:/var/run/docker.sock健康检查:测试:[“cmd-shell”,“$$(netstat-ltn grep-q 9092)”]间隔:15s超时:10s重试:5环境:HOSTNAME_COMMAND:“docker info grep^名称:cut-d''-f 2”kafka_zookeeper_connect:zookeeper:2181

    • Kafka设置在双节点设置中,形成集群。通过docker环境变量,我们将replication因子设置为2,因此每个分区在每个节点上都应该有一个复制。

    我找到并检查的相关主题/问题/讨论

    >

  • KIP-49

    https://faust.readthedocs.io/en/latest/developerguide/partition_assignor.html

    我查了Kafka的邮件档案,但没有找到任何东西

    签出的流示例应用程序

    全方位地寻找别人遇到的这个问题,但没有给我我需要的答案。我还发现了KAFKA-7144,但这对我们来说不应该是一个问题,因为我们正在运行2.0.0

    如果有人遇到过类似的问题,或者能指出我可能很愚蠢的错误,请指点我!

  • 共有1个答案

    竺展
    2023-03-14

    对于将来遇到同样问题的读者,解决方案是不使用每个有1个分区的N个主题,而是使用有N个分区的1个主题。即使有120个分区和400+个机器/事件源,多个事件类型也会被放入同一个分区,但这不会影响事件的顺序。

    实现方法是将记录键设置为机器名,并让底层逻辑负责哪个值进入哪个分区。由于我们现在有一个消费者组,其中有X个消费者订阅了本主题,因此分区被平均分配给消费者,每个消费者使用120/X个分区。

    这正如Matthias所建议的,在Devoxx Belgium 2018展会上,来自Confluent的其他乐于助人的人士进一步证实了这一点。谢谢你!

    小费

    使用WurstMeister/Kafka docker映像时,请考虑使用environment属性:

    kafka_create_topics:“app.machine-events:120:2”

    意义

    主题名称:分区数:复制因子

     类似资料:
    • 我有10个消费者和10个分区。我取分区数 并且使用相同的group.id创建相同数量的消费者。 我也发现很少这样的日志->

    • Kafka-来自同一组的多个使用者分配了相同的分区 我刚刚开始学习Kafka和诺德。我已经写了一篇关于消费者的文章如下 输出 有四个分区。 编辑 我使用了,如下所示。 生产者正在发送100条消息,收到的消息如下。这就是我如何知道分配的分区(不是从对象)。 当我运行两个这样的使用者实例(相同的主题和组)时,其中只有一个接收来自分区0的所有内容。这不是问题吗? 这是生产商代码。

    • 我有一个带有15个分区的kafka主题[0-14],我正在运行带有5个并行的flink。因此,理想情况下,每个并行flink使用者应该分别使用3个分区。但即使在多次重启之后,很少有Kafka分区不被任何flink工人订阅。 注意:如果我以1个并行度开始作业,则作业工作非常好。 Flink版本:1.3.3

    • 假设我每个执行器有36个核心,每个节点有一个执行器,以及3个节点,每个节点有48个可用核心。我注意到的基本要点是,当我将每个任务设置为使用1个内核(默认值)时,我对workers的CPU利用率约为70%,每个执行器将同时执行36个任务(正如我所预期的那样)。然而,当我将配置更改为每个任务有6个内核时(conf spark.task.cpus=6),每个执行器一次会减少到6个任务(如预期的那样),但

    • 问题内容: 这是在最近的PyCon演讲中提出的。 该声明 没有任何意义,但是也不会引发异常。我觉得这一定是由于拆箱规则造成的。您也可以使用列表对元组进行解包,例如, 符合您的期望。作为逻辑结果,当要拆包的元素数为0时,这也应该起作用,这将解释为什么分配给空列表是有效的。当您尝试将非空列表分配给空列表时会发生什么,进一步支持了该理论: 如果元组也是如此,我将对此解释感到满意。如果我们可以解压缩到包含

    • 我不确定我是否做错了什么--但如果有人能帮助澄清这一点。 场景: null