当前位置: 首页 > 知识库问答 >
问题:

在 DataStream Flink 中从 JDBC 源读取数据的问题

袁泓
2023-03-14

我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。

我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。

首先,我想使用DataStream API,这样我们既可以使用历史数据,也可以使用实时数据。我不想使用DataSet API,但我也不认为使用SQL/Table API有什么意义,因为我更喜欢用Java类编写函数。我需要维护自己的状态,而数据流键控函数似乎是一条出路。

现在我正试图针对我们的生产数据库编写代码,我需要能够通过SQL查询读入“流”数据-似乎没有JDBC源连接器,因此我认为我必须自己进行JDBC调用,然后可能使用env.fromElements()创建数据源。显然,这是一个“有界”的数据集,但我怎么才能加载历史数据呢?在未来,我还想包括一个Kafka流,它只有几周的数据,所以我想我有时需要将SQL Server/Snowflake数据库中的数据与Kafka数据流中的实时数据流合并。这方面的最佳实践是什么,因为我没有看到讨论这一点的示例。

在从JDBC数据源检索数据时,我也看到了一些使用StreamingTableEnvironment的例子——我是不是应该用它来查询从JDBC连接到我的数据流函数中的数据?同样,我想用Java写函数,而不是用Flink SQL。如果我只使用DataStream API,那么使用StreamingTableEnvironment查询JDBC数据是否是最佳实践?

共有1个答案

范承教
2023-03-14

可以使用以下方法来读取数据库并创建数据流:

>

  • 您可以使用RichParallelSourceFunction对数据库进行自定义查询并从中获取数据流。可以在RichParallelSourceFunction类的扩展中启动带有JDBC的SQL驱动程序。

    使用Table DataStream API-可以通过创建JDBC曲库查询数据库,然后将其转换为流

    另一种选择,也许是更昂贵的解决方案 - 您可以使用Flink CDC连接器,它为Apache Flink提供源连接器,使用更改数据捕获(CDC)从不同的数据库引入更改

    然后你可以添加Kafka作为源,得到一个数据流。

    因此,简单地说,您的管道可以如下所示:您将两个源转换为数据流,您可以使用协进程函数连接这些流,这也将使您能够维护状态并在业务逻辑中使用它。最后,使用Sink函数将最终输出接收到数据库、Kafka甚至AWS S3存储桶。

  •  类似资料:
    • 问题内容: 我有一个MS-Access数据库,我正在使用JDBC(我认为是JDBC-ODBC桥)在Java中连接到该数据库。我的访问数据库有一些希伯来语值。 当我尝试使用String str = rs.getString(1)(rs是RowSet)读取这些值时,我得到的字符串只是一个问号字符串。 我还有希伯来语中的其他字符串,这些字符串是我在Java代码中使用字符串文字设置的,它们可以正常工作。所

    • 我的springboot应用程序尝试从两个数据源(emwbis和backupemwbis)读取数据。我按照下面的链接配置了我的springboot应用程序以从两个不同的数据源读取数据。 http://www.baeldung.com/spring-data-jpa-multiple-databases 我的应用程序当前的问题是,它总是从主数据源(emwbis)读取数据。我已经编写了以下代码。 主数

    • 问题内容: 这个问题的目的是记录: 在PySpark中使用JDBC连接读取和写入数据所需的步骤 JDBC源可能存在的问题以及已知的解决方案 只要稍作更改,这些方法就可以与其他支持的语言一起使用,包括Scala和R。 问题答案: 写数据 提交应用程序或启动Shell时,包括适用的JDBC驱动程序。您可以使用例如: 或结合和 也可以在启动JVM实例之前使用环境变量来设置这些属性,或使用set或/来设置

    • 我在从CSV文件读取数据以将值传递给请求时遇到问题。我有一个csv与3列用户id,密码和类型。当为username字段传递数据时,它采用3列的值,而不仅仅是username Jmeter version: 5.0 CSV 文件值:

    • 我需要从属性文件加载数据源属性 db.properties: 这是数据源(camelcontext.xml): 我试着这样做,但没用。 我的路由是在java dsl中实现的。

    • 在一个字符串中传递路径、用户名、密码和所有选项可以方便地创建到数据库的连接。有没有办法从一个URL创建数据源对象?也许可以从开放的SQL连接创建数据源?