当前位置: 首页 > 知识库问答 >
问题:

Flink SQL:在纯SQL语法中用时间戳连接表

蒋嘉颖
2023-03-14

我在使用Flink的SQL语法连接多个表时遇到了一些问题,其中至少有一个表具有时间属性列。

我有一个表 Table1,当 rowtime 用作 flink rowtime 时,它使用架构(id、value1、rowtime)。

我想将此表与使用模式(id, value 2)的表Table2连接。连接必须在匹配id时完成。

最后,我想通过使用滚动时间窗口对这个连接的结果进行分组。

仅仅使用SQL语法有可能做到这一点吗?

下面是我想做的一个例子:

SELECT 
    Table1.id as id, 
    TUMBLE_END(rowtime, INTERVAL '10' SECOND),
    MAX(value1) as value1,
    MAX(value2) as value2       
FROM Table1 JOIN TABLE2 ON Table1.id = Table2.id
GROUP BY Table1.id, TUMBLE(rowtime, INTERVAL '10' SECOND)

但它给了我以下错误:

2019-11-12 16:37:57.191 [main] ERROR - Cannot generate a valid execution plan for the given query: 

FlinkLogicalCalc(expr#0..6=[{inputs}], id=[$t0], EXPR$1=[$t4], value1=[$t1], value2=[$t2])
  FlinkLogicalWindowAggregate(group=[{0}], value1=[MAX($2)], value2=[MAX($3)])
    FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[0], proj#0..1=[{exprs}], value1=[$t3], value2=[$t3])
      FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
        FlinkLogicalTableSourceScan(table=[[Table1]], fields=[id, value1, rowtime], source=[KafkaTableSource(id, value1, rowtime)])
        FlinkLogicalTableSourceScan(table=[[Table2]], fields=[id, value2], source=[Table2_Type(id, value2)])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalCalc(expr#0..6=[{inputs}], id=[$t0], EXPR$1=[$t4], value1=[$t1], value2=[$t2])
  FlinkLogicalWindowAggregate(group=[{0}], value1=[MAX($2)], value2=[MAX($3)])
    FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[0], proj#0..1=[{exprs}], value1=[$t3], value2=[$t3])
      FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
        FlinkLogicalTableSourceScan(table=[[kafkaDataStream]], fields=[id, value1, rowtime], source=[KafkaTableSource(id, value1, rowtime)])
        FlinkLogicalTableSourceScan(table=[[SensorConfigurationUpdateHTTP]], fields=[id, value2], source=[Table2_Type(id, value2)])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
    at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:379)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
    ...

我还尝试将我的rowtime转换为TIMESTAMP类型(根据错误消息的建议),但我无法再处理时间窗口。它会导致以下错误:

2019-11-12 16:44:52.473 [main] ERROR - Window can only be defined over a time attribute column.
org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
    at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
    at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
    at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
    at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
    at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
    at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
    at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:813)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:379)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)

共有1个答案

沈俊明
2023-03-14

联接结果不能包含时间属性,因为联接不能保证时间戳的顺序得到保留。Flink假设两个表都是动态的,可以在任何时间点更改。表表2中的一条新记录可能与表格1的第一条记录连接,产生时间戳为“随机”顺序的结果。

可以通过向联接添加时态约束来更改此设置。您可以使用时间窗口联接定义查询,或者将 Table2 建模为时态表,然后将 Table1 与它联接。

 类似资料:
  • 问题内容: 我对编写sql很陌生,并且对联接有疑问。这是一个示例选择: 因此,假设我正在寻找所有嵌套在其中某个绿色小盒子的大盒子的名称。如果我理解正确,上述语法是获得与使用’join’关键字可获得的结果相同的另一种方法。 问题:上面的select语句对于正在执行的任务有效吗?如果没有,有什么更好的方法呢?语句是联接的语法糖还是它实际上在做其他事情? 如果您有任何与该主题相关的好材料的链接,我会很乐

  • 问题内容: 我可以看到例如在这里进行了几次讨论,但是我认为由于Elasticsearch中的重大更改,解决方案已过时。 我正在尝试将我在Kafka主题中的Json中的long / epoch字段转换为通过连接器推送的Elasticsearch日期类型。 当我尝试添加动态映射时,我的Kafka连接更新失败,因为我试图将两个映射应用于字段_doc和kafkaconnect。我认为这是关于版本6的重大更

  • 本文向大家介绍sqlite时间戳转时间语句(时间转时间戳),包括了sqlite时间戳转时间语句(时间转时间戳)的使用技巧和注意事项,需要的朋友参考一下 下面是具体的实现代码:

  • 我试图获得一个SQL语句来选择时间戳在两个给定时间戳之间的行。这是我试过的。(这是在Java servlet上,而我是Java时间戳对象。) 这里的from和to是timestamp对象这给了我一个语法错误,当我试图准备语句时,请帮助

  • 问题内容: 我正在编写一个SQL查询,其中涉及查找时间戳是否在特定的天数范围内。 我已经在PostgreSQL中编写了它,但是在Oracle和SQL Server中不起作用: 有没有通用的方法可以比较不同数据库之间的时间戳? 问题答案: 我不是SQL Server专家,但我知道它可以在Oracle和Postgres上运行,并且我怀疑它可能在MSSQL上运行,但没有办法对其进行ATM测试。 或者,如

  • 从我的数据库我检索值为: 我想要上面的: 我试着用当前的时间戳跟踪 实际时间戳:2018年6月22日星期五16:07:35 更新了,我不想更新,有没有办法保持原样?