当前位置: 首页 > 知识库问答 >
问题:

Kafka流拓扑不同的键,但相同的架构

孙玺
2023-03-14

我有一个Kafka Streams拓扑,其中我加入了5个表,每个表都是在一个主题上创建的,该主题由一些Kafka连接器填充,这些连接器产生KeyValue事件,其中Key是针对相同的Avro模式产生的,但在我的拓扑中,当我加入这些表时,Key似乎不一样,如果它们是Java等于事件。所有这些背后的原因是什么?

它与Confluent Schema Registry集成。

我们已经使用了调试器,并且在调试时看到在不同主题上收到的两个键,但具有相同的值是相等的。但与此同时,如果在主题B之上构建的商店中执行查找,并且该查找是在主题A上接收的键,则它将不匹配某些内容。

fun streamsBuilder(): StreamsBuilder {
    val streamsBuilder = StreamsBuilder()
    val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
    val productPricesStream = streamsBuilder.stream<Key, PriceVariantsHolder>(streamNameRepository.productsPricesStreamTopic)
    val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
    val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)

    val productsStockStream =
            inputProductsStockStream.map { key, value -> toKeyValue(key, productStockMapper.aStockQuantity(value)) }
    productsStockStream.to(streamNameRepository.productsStockStreamTopic)

    streamsBuilder.globalTable<Key, StockQuantity>(streamNameRepository.productsStockStreamTopic,
            Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic))

    val saleProductsTable = productsStream
            .filter { _, aggregate -> aggregate.payload != null }
            .map { key, aggregate -> toKeyValue(key, saleProductMapper.aSaleProduct(aggregate) { productsStockStore().get(Key(it)) }) }
            .mapValues { saleProduct -> log.debug("received $saleProduct"); saleProduct; }
            .groupByKey()
            .reduce({ _, saleProductAvro -> saleProductAvro }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))

    val productPricesTable = productPricesStream
            .map { key, aggregate -> toKeyValue(key, aggregate) }
            .groupByKey()
            .reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))

    val productsRatingsTable = productsRatingsStream
            .map { key, aggregate -> toKeyValue(key, productRatingMapper.aProductRating(aggregate)) }
            .groupByKey()
            .reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))

    val productsStockTable = productsStockStream
            .map { key, aggregate -> toKeyValue(key, aggregate) }
            .groupByKey()
            .reduce { _, aggregate -> aggregate }

    val productsInNeedOfVariantStockUpdate = productsInNeedOfVariantStockUpdate(productsStockTable, saleProductsTable)

    saleProductsTable
            .outerJoin(productPricesTable, saleProductMapper::aPricedSaleProduct)
            .outerJoin(productsRatingsTable, saleProductMapper::aRatedSaleProduct)
            .outerJoin(productsStockTable, saleProductMapper::aQuantifiedSaleProduct)
            .outerJoin(productsInNeedOfVariantStockUpdate, saleProductMapper::aSaleProductWithUpdatedVariantStock)
            .toStream()
            .filter { _, saleProductAvro -> saleProductAvro.id != null }
            .mapValues { value -> log.debug("publish {}", value); value; }
            .to(streamNameRepository.outputSaleProductsTopic)

    return streamsBuilder
}

private fun <V> toKeyValue(key: Key, value: V): KeyValue<Key, V> {
    return KeyValue(Key.newBuilder(key).build(), value)
}

共有1个答案

韩琛
2023-03-14

如果您与融合模式注册中心集成,每个主题的神奇字节将是不同的,因此连接不会像预期的那样工作(因为键比较发生在字节级...)

这是意料之中的。这个问题偶尔会出现,在Kafka Streams中本地解决(即内置)很棘手,因为Confluent Schema注册表是第三方工具,而Kafka Streams应该与它无关。

不过也有一些解决方法。

一种解决方法是将我们在拓扑中收到的每个键重新映射到一个新的键,现在拓扑中的所有键都使用相同的Avro Schema生成(通过模式ID生成相同的Avro Schema)。

其他的选择(并不真的更好)是“剥离神奇的字节”或者为连接键使用不同的数据类型(例如,一些POJO)。因此,所有这些方法都是相似的。

 类似资料:
  • 我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于

  • 在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能

  • 我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “

  • 我有个人课: 我有一个叫personList的人的名单: 现在我需要找到所有状态为“不活跃”的人,不管这个人是否有身份证。如果一个人没有身份证,但状态为“活跃”,也包括那个人。我正在使用Java流进行过滤: 流式传输后的结果是没有人被选中。第一个人没有被选中是因为它没有通过第二个过滤器,最后一个人没有被选中是因为它没有通过第一个过滤器。 我想我可以通过使用将两个过滤器合并成一个来修复它,但是我正在

  • 我有两个Kafka制作人向具有多个分区的同一主题发送消息。 正如预期的那样,来自同一生产者PR1的具有相同密钥K1的消息总是转到同一分区PA1。 问题是来自另一个生产者PR2的具有相同密钥K1的消息转到另一个分区PA2,而我希望它们也转到PA1。 Kafka不是在制片人之间保留分区分配吗? 是否与两个生产者使用不同的Kafka客户端库有关? 如果我设置两个制作人使用相同的id,会有帮助吗?