当前位置: 首页 > 知识库问答 >
问题:

用Kafka流/KSQL连接表?

丌官嘉良
2023-03-14

我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。

1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。

2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->link-tableglobaktable。这可能会导致大量磁盘空间的使用,因为我的表非常大。这是一个包含大量表超大型DB,在数据上构建多个逻辑视图的需求是项目的核心部分,无法避免。

a)我现在就明白了吗?

听起来唯一存在的东西是KStream-to-GlobalKTable,似乎我需要把事情颠倒一下。我的原始DB生物测定表,需要变成一个流,而我的链接文档表,需要变成一个流,首先用于聚合,然后是一个GlobalKTable用于连接。

无论哪种方式,除非我的流只有一个分区,否则这可能非常昂贵。

共有1个答案

贺善
2023-03-14

几个月前,我碰巧用Kafka Streams开发了一个类似的用例,我很高兴与大家分享我的经验。

按照您的建议使用KStreams-to-KTable会很管用,尽管有一些您可能无法接受的警告。

首先,回想一下,流到表的联接只有在流端(而不是ktable端)接收到新事件时,才由Kafka流更新。

https://svend.kelesia.com/one-to-many-kafka-streams-ktable-join.html

使用该技术,我能够正确地从DB导入一对多和多对多的关系。

 类似资料:
  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中

  • 当kafka streams应用程序运行且kafka突然停机时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当kafka恢复时,一切(理论上)都应该恢复正常。我正在尝试获取有关此情况的警报,但我无法找到捕获该警报的位置并发送日志/度量。我尝试了以下方法: 但这只发生在异常情况下,而不是这里 扩展并将属性更改为我的类,从而扩展了此接口。 我知道Kafka有自己的衡量标准,我可

  • 我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。 用于独立连接器的命令,工作正常 - 现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示 2 个节点当前正在运行具有相同连接服务。 连接服务已启动并正在运行,但它不会加载 下定义的连接器。 应该对