当前位置: 首页 > 知识库问答 >
问题:

KTable KTable外键联接在主题具有多个分区时不生成所有消息

吕昀
2023-03-14

请参阅下面的更新以显示潜在的解决方案

我们的应用程序使用2个主题作为KTables,执行左连接,并输出到一个主题。在测试过程中,我们发现当我们的输出主题只有一个分区时,这项功能可以正常工作。当我们增加分区的数量时,我们注意到生成到输出主题的消息数量减少了。

在启动应用程序之前,我们用多个分区配置测试了这一理论。使用1个分区,我们可以看到100%的消息。使用2,我们可以看到一些消息(少于50%)。对于10,我们几乎看不到任何(低于10%)。

因为我们还没有加入,所以主题1中使用的每一条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎被困在从Ktables的外键连接创建的“中间”主题中,但没有错误消息。

任何帮助将不胜感激!

Service.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

建筑格拉德尔

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'

注意:我们排除了org.apache.kafka的依赖关系,因为sping-Cloud-stream中包含的版本存在错误

application.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2

测试场景:

为了提供一个具体示例,如果我将以下3条消息发布到主题1:

{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}

输出主题将仅接收2条消息。

{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}

另外两个怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。

更新:

通过将主题1作为KStream而不是KTable来使用,并在继续执行KTable-KTable连接之前调用toTable(),我可以使其正常工作。我仍然不确定我最初的解决方案为什么不起作用,但希望这个解决方案能够对实际问题有所帮助。

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

共有3个答案

卫嘉言
2023-03-14

这是一个奇怪的问题,我从未听说过有许多输出主题分区控制数据写入频率。但是我知道只有当缓存已满时,toStream()才会将数据写入下游,所以请尝试设置cache。最大字节数。缓冲=0。此外,KTable只保留每个键的最新记录,因此如果对同一个键有多个值,则只有最新的值会保留并写入下游。

许焕
2023-03-14

选择加入主题的键可能会有所帮助。主题的分区配置应该相同。

return (topicOne, topicTwo) ->
        topicOne
            .leftJoin(topicTwo,
                value -> MyOtherKey.newBuilder()
                    .setFieldA(value.getFieldA())
                    .setFieldB(value.getFieldB())
                    .build(),
                this::enrich)
            .toStream().selectKey((key, value) -> key);
归松
2023-03-14

根据对问题的描述,KTable输入主题(左)中的数据似乎没有按其键正确分区。对于单个分区主题,只有一个分区,所有数据都进入这个分区,连接结果完成。

但是,对于多分区的输入主题,您需要确保数据是按键分区的,否则,具有相同键的两个记录可能会在不同的分区中结束,从而导致连接失败(因为连接是在每个分区的基础上完成的)。

请注意,即使外键联接不要求两个输入主题都是共分区的,但仍然要求每个输入主题本身都按其键进行分区!

如果使用map()。toTable()您基本上会触发数据的内部重新分区,以确保数据按键进行分区,这就解决了问题。

 类似资料:
  • 我们希望使用Kafka connect sink连接器将消息从Kafka复制到Mongo DB。在我们的用例中,我们有多个主题,每个主题都有一个分区(主题的名称可以用正则表达式表示,例如topic.XXX.name)。这些主题的数量在不断增加。我想知道Kafka connect架构是否适合这个用例。如果是这样,如何配置它的增益高可缩放性和并行性?任务是什么。最大值?工人数量?

  • 我想要任何关于Kafka如何维护消息序列的信息/解释,当消息被写入多个分区的主题时。例如,我有多个消息生成器,每个消息生成器按顺序生成消息,并用超过1个分区编写Kafka主题。在这种情况下,消费者组将如何工作来消费消息。

  • 问题内容: 只是为了扩展我以前的问题-我的数据库中有两个表,我想提取某些信息。下表: 您向我提供了以下代码来获取播放器的名称: 效果很好,谢谢-我只想进行两项调整: 我想返回得分手的名字以及球员的名字。由于m.scorer是一个ID,如果p.Name已被映射到m.playerID,我如何将其映射到p.Name属性? 上面的查询返回所有玩家的姓名。我很快就会添加搜索功能,您可以在其中搜索特定玩家的所

  • 我们使用的是 Kafka 2.5.1 版本集群。最近注意到其中一个主题分区数据大小不均匀。与其余分区相比,一个特定分区的大小增加了 300%。这在群集中造成了不均衡的磁盘利用率。 已验证使用者滞后,看起来像其他分区一样正常 此外,我们使用默认分区程序和设置为默认值的“metadata.max.age.ms”配置,即 300000ms(5 分钟) 我们是如何使分区数据均匀分布的?

  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 假设一个主题有3个kafka分区,我希望我的事件按小时窗口,使用事件时间。 当某个分区位于当前窗口之外时,kafka使用者是否会停止读取该分区?还是打开一个新窗口?如果它正在打开新的窗口,那么,如果一个分区的事件时间与其他分区相比会非常倾斜,那么从理论上讲,它不可能打开无限数量的窗口,从而耗尽内存吗?当我们重播一些历史时,这种情况尤其可能发生。 我一直试图从阅读留档中得到这个答案,但是在分区上找不