当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams:为执行KTable-KTable外键连接的应用程序增加主题分区

姬振濂
2023-03-14

我找到的大部分信息都与主键连接有关。我知道外键连接对于Kafka流来说是一个相对较新的功能。我对它的规模感兴趣。我知道Kafka Streams并行性受到每个主题上的分区数的限制,但是我对增加输入主题分区意味着什么有一些疑问。

  • 外键联接对共分区输入主题有相同的要求吗?也就是说,两个主题是否需要具有相同数量的分区
  • 在应用程序在生产环境中运行数月或数年后,如何添加分区?支持每个KTable的changelog主题存储来自特定输入主题分区的数据。如果要增加输入主题中的分区,这会如何影响我们的KTables的状态存储和变更日志?据推测,我们不能重新开始并丢失这些数据,因为这些数据已经积累了数月或数年,对于执行连接至关重要。它可能不会很快被上游数据取代。我们是否需要取消状态存储,创建新的输入主题,并将所有KTable changelog主题数据重新发送给它们
  • 其他内部“订阅”主题如何

共有1个答案

徐知
2023-03-14

外键连接对共同分区输入主题有相同的要求吗?也就是说,两个主题需要有相同数量的分区吗?

不,更多细节请查看https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/

在应用程序在生产环境中运行数月或数年后,如何添加分区?

即使不使用Kafka Streams,您也不能这样做。问题是,您的输入数据是按键分区的,如果您添加分区,输入主题中的分区会中断。-推荐的模式是创建一个具有不同数量分区的新主题。

支持每个KTable的changelog主题存储来自特定输入主题分区的数据。如果要增加输入主题中的分区,这会如何影响我们的KTables的状态存储和变更日志?

这会破坏应用程序。事实上,如果Kafka Streams检测到输入主题分区的数量与changelog主题分区的数量不匹配,它将进行检查并引发异常。

 类似资料:
  • Confluent网站的留档提到以下内容: 左侧KTable可以有多条记录,这些记录映射到右侧KTable上的同一个键。如果右KTable中存在相应的键,则对单个左KTable条目的更新可能会导致单个输出事件。因此,对右KTable条目的单个更新将导致对左KTable中具有相同外键的每个记录进行更新。 查看下面的示例说明: 根据解释,如果是内部联接,则右侧的应该触发左侧的两条记录,这两条记录将被添

  • 我正在尝试执行 KTable-KTable 外键联接,但我收到一个错误,因为 Kafka 流正在尝试对外键使用字符串 serde。 我希望它使用Kotlinx序列化服务器。如何指定? 我想使用FK选择器将两个KTables的数据连接在一起,并将值重新映射到一个聚合对象中。 然而,我得到一个错误,因为Kafka Streams正在使用(我的默认Serde)用于反序列化外键。但它是一个JSON对象,我

  • 我正在尝试通过键连接两个(无窗口)并将结果写入

  • 对于我的应用程序,我使用KTable-Ktable连接,这样无论何时在主数据流或子数据流上接收数据,它都可以为所有三个表设置带有setters和getters的复合对象。这三个传入流具有不同的键,但是在创建KTable时,我为所有三个KTable设置相同的键。 我有一个分区的所有主题。当我在单个实例上运行应用程序时,一切都运行良好。我可以看到compositeObject填充了所有三个表中的数据。

  • 请参阅下面的更新以显示潜在的解决方案 我们的应用程序使用2个主题作为KTables,执行左连接,并输出到一个主题。在测试过程中,我们发现当我们的输出主题只有一个分区时,这项功能可以正常工作。当我们增加分区的数量时,我们注意到生成到输出主题的消息数量减少了。 在启动应用程序之前,我们用多个分区配置测试了这一理论。使用1个分区,我们可以看到100%的消息。使用2,我们可以看到一些消息(少于50%)。对

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我