我试图编写一个流作业,它将数据流下沉到postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink,这些文章建议使用JDBCoutputFormat。
98 ...
99 String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102 .setDrivername("org.postgresql.Driver")
103 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104 .setQuery(strQuery)
105 .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106 .finish();
107
108 DataStream<Row> rows = FilterStream
109 .map((tuple)-> {
110 Row row = new Row(3); // our prepared statement has 3 parameters
111 row.setField(0, tuple.f0); // first parameter is case ID
112 row.setField(1, tuple.f1); // second paramater is tracehash
113 row.setField(2, f.format(tuple.f2)); // third paramater is tracehash
114 return row;
115 });
116
117 rows.writeUsingOutputFormat(jdbcOutput);
118
119 env.execute();
120
121 }
122 }
所以我的问题是:我错过了什么吗?我应该将插入的行提交到某个地方吗?
向你致意,伊格内修斯
正如Chesnay在他的评论中所说,你必须调整批处理间隔。
然而,这还不是故事的全部。如果您想获得至少一次的结果,您必须将批处理写与Flink的检查点同步。基本上,您必须将JDBCoutputFormat
包装在SinkFunction
中,该函数也实现了CheckPointedFunction
接口。当调用snapshotstate()
时,您已经将批处理写入数据库。您可以查看这个请求,它将在下一个版本中提供此功能。
问题内容: 有 DataFrame.to_sql 方法,但仅适用于mysql,sqlite和oracle数据库。我无法传递给此方法postgres连接或sqlalchemy引擎。 问题答案: 从pandas 0.14(2014年5月发行)开始,支持postgresql。该模块现在用于支持不同的数据库风格。您可以为PostgreSQL数据库传递sqlalchemy引擎(请参阅docs)。例如: 您是
问题内容: 我想将1到n层次结构作为邻接表存储到列出每个元素祖先的表中。我正在使用Postgres数据库(Postgres 10,但是要在其上部署代码的计算机运行Postgres 9.x)。 示例输入表(邻接表): 结果,我想要一个看起来像这样的表(仅显示了几行;此外,我要解决的现实问题有七个层次级别,而不是两个): 是元素的ID,是该元素在层次结构中所处的级别(0是根级别),是元素在各个级别上的
我试图在postgres表的numeric datatype列中插入一个日期值 在上面的查询中,col1是numeric类型。 我还可以使用value()子句插入一些值,使用select语句插入一些值吗?例如: 在上表中:1。col1即将到来序列2。col2和col3来自两个表的连接,即表B和表C 3。col4和col5是硬编码值 如何在一个查询中实现这一点? 通过两个表的连接进行插入,可按如下方
我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2
Datastream Pro是一个数据库"浏览器"和数据操作工具.它易于使用,可靠,稳定,操作直观。Datastream Pro支持所有兼容JDBC的数据库(已经在Oracle,MySQL,postgreSQL和HSQLDb上测试成功).利用它可以浏览与编辑数据库中的数据,可在一个友好的界面中运行与编辑SQL脚本,可使用查询编辑器来编辑SQL查询,可同时连接到多个数据库和易于使用的连接向导等。
我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了