当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:将DataStream写入Postgres表

吕嘉荣
2023-03-14

我试图编写一个流作业,它将数据流下沉到postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink,这些文章建议使用JDBCoutputFormat。

98     ... 
99     String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101     JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102      .setDrivername("org.postgresql.Driver")
103      .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104      .setQuery(strQuery)
105      .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106      .finish();
107
108     DataStream<Row> rows = FilterStream
109                 .map((tuple)-> {
110                    Row row = new Row(3);                  // our prepared statement has 3 parameters
111                    row.setField(0, tuple.f0);             // first parameter is case ID
112                    row.setField(1, tuple.f1);             // second paramater is tracehash
113                    row.setField(2, f.format(tuple.f2));   // third paramater is tracehash
114                    return row;
115                 });
116
117     rows.writeUsingOutputFormat(jdbcOutput);
118
119     env.execute();
120
121     }
122 }

所以我的问题是:我错过了什么吗?我应该将插入的行提交到某个地方吗?

向你致意,伊格内修斯

共有1个答案

壤驷骁
2023-03-14

正如Chesnay在他的评论中所说,你必须调整批处理间隔。

然而,这还不是故事的全部。如果您想获得至少一次的结果,您必须将批处理写与Flink的检查点同步。基本上,您必须将JDBCoutputFormat包装在SinkFunction中,该函数也实现了CheckPointedFunction接口。当调用snapshotstate()时,您已经将批处理写入数据库。您可以查看这个请求,它将在下一个版本中提供此功能。

 类似资料:
  • 问题内容: 有 DataFrame.to_sql 方法,但仅适用于mysql,sqlite和oracle数据库。我无法传递给此方法postgres连接或sqlalchemy引擎。 问题答案: 从pandas 0.14(2014年5月发行)开始,支持postgresql。该模块现在用于支持不同的数据库风格。您可以为PostgreSQL数据库传递sqlalchemy引擎(请参阅docs)。例如: 您是

  • 问题内容: 我想将1到n层次结构作为邻接表存储到列出每个元素祖先的表中。我正在使用Postgres数据库(Postgres 10,但是要在其上部署代码的计算机运行Postgres 9.x)。 示例输入表(邻接表): 结果,我想要一个看起来像这样的表(仅显示了几行;此外,我要解决的现实问题有七个层次级别,而不是两个): 是元素的ID,是该元素在层次结构中所处的级别(0是根级别),是元素在各个级别上的

  • 我试图在postgres表的numeric datatype列中插入一个日期值 在上面的查询中,col1是numeric类型。 我还可以使用value()子句插入一些值,使用select语句插入一些值吗?例如: 在上表中:1。col1即将到来序列2。col2和col3来自两个表的连接,即表B和表C 3。col4和col5是硬编码值 如何在一个查询中实现这一点? 通过两个表的连接进行插入,可按如下方

  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

  • Datastream Pro是一个数据库"浏览器"和数据操作工具.它易于使用,可靠,稳定,操作直观。Datastream Pro支持所有兼容JDBC的数据库(已经在Oracle,MySQL,postgreSQL和HSQLDb上测试成功).利用它可以浏览与编辑数据库中的数据,可在一个友好的界面中运行与编辑SQL脚本,可使用查询编辑器来编辑SQL查询,可同时连接到多个数据库和易于使用的连接向导等。

  • 我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了