当前位置: 首页 > 知识库问答 >
问题:

Kafka流——有可能减少由多个聚合创建的内部主题的数量吗

姬墨竹
2023-03-14

我有一个Kafka Streams应用程序,它可以根据几个值对传入的消息进行分组。例如:

示例消息:

{ "gender": "female", "location": "canada", "age-group": "25-30" }

拓扑:

table
    .groupBy((key, value) -> groupByGender) // example key: female
    .count("gender-counts");

table
    .groupBy((key, value) -> groupByLocation) // example key: canada
    .count("location-counts");

table
    .groupBy((key, value) -> groupByAgeGroup) // example key: 25-30
    .count("age-group-counts");

这导致了很多话题:

my-consumer-gender-counts-changelog
my-consumer-gender-counts-repartition
my-consumer-location-counts-changelog
my-consumer-location-counts-repartition
my-consumer-age-group-counts-changelog
my-consumer-age-group-counts-repartition

如果我们可以将多个聚合发送到单个状态存储,并将group by value作为密钥的一部分,那就太好了。例如:

table
    .groupBy((key, value) -> groupByGender) // example key: female_gender
    .count("counts");

table
    .groupBy((key, value) -> groupByLocation) // example key: canada_location
    .count("counts");

table
    .groupBy((key, value) -> groupByAgeGroup) // example key: 25-30_age_group
    .count("counts");

这将导致主题少得多:

counts-changelog
counts-repartition

这在目前看来是不可能的(无论如何使用DSL),因为使用groupBy操作符会创建一个用于重新分区的内部主题,所以如果我们有多个子拓扑groupBy不同的东西,那么Kafka Streams将尝试从多个源注册相同的重新分区主题。这会导致以下错误:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic counts-repartition has already been registered by another source.
        at org.apache.kafka.streams.processor.TopologyBuilder.validateTopicNotAlreadyRegistered(TopologyBuilder.java:518)

如果groupBy可以返回多个记录(例如像flatMap那样),那么我们可以返回一组记录(每个分组一个记录),但使用DSL似乎也不可能。

我的问题是,如果一个记录可以按多个值进行分组(例如,{“性别”:“女性”,“地点”:“加拿大”,“年龄组”:“25-30”}),那么创建多个主题(每个分组2个)是否值得关注(例如,我们有100个不同的分组)?当一个记录可以由多个值分组时,还有其他策略更适合吗?我的建议(将多个聚合下沉到一个变更日志主题)是个坏主意吗(即使唯一键的数量非常少)?

共有1个答案

常哲彦
2023-03-14

如果要按不同的属性进行分组,则无法避免多个重新分区主题。假设您有两个分组属性g1g2以及具有以下值的三个记录:

r1 = g1:A, g2:1
r2 = g1:A, g2:2
r3 = g1:B, g2:2

因此,为了基于g1正确聚合记录,必须将r1r2记录分组在一起。假设您的重新分区主题有两个分区p1p2,则记录将得到如下重新分配:

p1: r1, r2
p2: r3,

另一方面,如果在r2上进行聚合,则必须将r2r3记录分组在一起:

p1: r1
p2: r2,r3

请注意,r2对于这两种情况都必须转到不同的分区,因此,不可能使用单个主题,但每个分组需要一个主题。(这不是Kafka特有的——任何其他框架也需要多次复制和重新发布日期)。

理论上,如果你添加更多的语义信息(比如超键、子键或1对1键映射),可以减少主题的数量。但是Kafka Streams不支持这一点(和AFAIK,没有其他类似的系统)。

 类似资料:
  • 我正在尝试设置一个安全的Kafka集群,但在ACL方面遇到了一些困难。 Kafka流的汇流安全指南(https://docs.Confluent.io/current/Streams/developer-guide/security.html)只说明必须将集群创建ACL交给主体...但它没有说任何关于如何实际处理内部话题的内容。 通过研究和实验,我确定(对于Kafka版本1.0.0): 通配符不能

  • 其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。 流配置如下: 消费者方面的错误: 这背后的原因是什么?

  • 我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,

  • 我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的

  • 我在Kafka Streams拓扑工作,有时,在更改应用程序ID和/或clientId属性后,我在特定的kafka流上收到错误:“”。我已经在每个Kafka节点的server.properties中设置了属性,但似乎没有创建此流的主题。 这是我的Kafka Streams拓扑: