当前位置: 首页 > 知识库问答 >
问题:

在 Flink 数据流中加入元数据的最佳方式是什么?

王才英
2023-03-14

我们有一个kafka事件流,我们希望使用一些驻留在MySQL DB中的元数据来丰富它。

元数据每隔几个小时就会发生变化。本质上,我们希望定期读取数据库,并使用新的元数据不断丰富事件。

一种方法是将广播状态与每隔几分钟/小时读取一次 DB 的定期源一起使用。广播此流并使用它来加入。但问题可能是广播流的首次读取可能晚于从Kafka Stream读取的某些消息。

有没有更好的方法?

共有1个答案

叶国兴
2023-03-14

您可以使用FlinkSQL。根据确切的要求,您可以对来自MySQL DB的CDC流进行时间版本化连接,或者对MySQL进行查找连接(可能启用了可选的缓存)。

参见https://github.com/ververica/flink-cdc-connectors.

更新:

如果您想使用数据流API,但担心在广播流中的相应数据可用之前可能会处理一些kafka消息,您可以:

  • 在扩充函数的open()方法中,对MySQL进行初始查询以预加载元数据
  • 如果在执行连接时广播数据仍然不可用,请使用在open()期间获取的数据,或使用一些硬连线到代码中的默认值

或者,可以使用状态处理器 API 引导广播状态的值。

 类似资料:
  • 我是Cassandra的新手,我正在努力弄清楚如何存储数据,以便能够并行执行快速读取。我听说分区数据会给性能带来问题?是否可以并行读取同一分区中Cassandra表中的数据?

  • 我有一个laravel命令,它搜索SQL Server数据库中的数据,并在MySQL数据库中插入每一行(如果已经存在,则更新)。 这是当前工作的代码: 首先,从SQL服务器搜索行: 然后插入和/或更新: 标签表定义如下: 对于每个块,填充数组和(给定每个块的行数,大约总共50秒)需要1/2秒,插入数据需要半秒 问题: 是否有更快的方法来填充数组? 由于客户端的限制,我们不能修改MySQL数据库中的

  • 我试图使用Java API将发送到,因为从开始,他们建议使用Java API而不是Scala API来获得更好的性能。 http://kafka.apache.org/090/documentation.html#ProducerAPI 我的XML文件有大约,我可以使用什么XML API来读取XML文件并转换为字符串,然后将该字符串发送到Kafka主题。 或者,我可以将XML转换为数据并将其发送给

  • 我有两个函数作为互斥锁和解锁: 我有这两个功能的原因是因为我不希望不同客户端之间的数据竞争,覆盖数据等等,这允许我锁定一段数据,修改它,然后解锁,以便其他客户端读取/更新。我没有使用postgres锁(表锁、行锁、咨询锁),因为我需要锁定多个事务的能力,而且因为我在程序中使用数据库池,所以我不能使用咨询锁,因为同时有多个会话连接到数据库,我无法控制哪一个锁/解锁 有没有更好的方法可以做到这一点(比

  • 在我的网站,我正在尝试加载产品项目而不刷新整个页面。我通过和设法做到了这一点。然而我遇到了一个问题。产品项包含单个图像。但由于我将间隔设置为3秒,它会重新下载所有需要的资源,我认为这是一个问题,因为连接速度慢的用户会感到恼火。我在开发工具上模拟了一个慢速连接,只是想看看会发生什么。 这就是结果。重新加载图像或整个product_item页面需要时间。 这是我的密码 现在我想知道什么是高效加载数据的