当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:如何在翻滚时间窗口中使用DISTINCT?

姜明贤
2023-03-14

我有这样一个流:

tableEnv.registerDataStream("userVisitPage", stream, "_time.rowtime, uri,userId");

然后我查询这个表:

final String sql =
       "SELECT tumble_start(_time, interval '10' second) as timestart, " +
       "  count(distinct userId) as uv, " +
       "  uri as uri, " +
       "  count(1) as pv " +
       "FROM userVisitPage " +
       "GROUP BY tumble(_time, interval '10' second), uri";

final Table table = tableEnv.sqlQuery(sql);

但是,查询会引发异常:

org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1006)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:234)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:321)
    at org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:44)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
    at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:837)
    at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:764)
    at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:734)
    at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
    at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:357)

如何实现此查询?


共有1个答案

松元明
2023-03-14
匿名用户

更新:Flink 1.6.0可用,支持流表上的DISTINCT聚合。

flink(1.4 . x版)尚不支持流表上具有< code>DISTINCT聚合的SQL查询。支持的目标是2018年年中之前不会发布的Flink 1.6。

但是,您可以实现一个用户定义的聚合函数来计算不同的计数,并在注册后在查询中使用该函数。查询语法当然会有所不同。

 类似资料:
  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 我有一个使用flink应用程序的场景,该应用程序接收以下格式的数据流: {“event\u id”:“c1s2s34”,“event\u create\u timestamp”:“2019-03-07 11:11:23”,“amount”:“104.67”} 我使用下面的滚动窗口来查找过去60秒内输入流的总和、计数和平均值。 键值。时间窗口(时间秒(60)) 然而,我如何标记聚合结果,以便我可以说

  • 如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的

  • 任何人都知道如何使用时间偏移进行翻滚窗口-窗口大小为一天,时间偏移基于时区以小时为单位。 我找到了使用DataStream API执行此操作的示例,想知道如何使用Table API/SQL实现它。 下面是我使用DataStream API的代码。 提前谢谢。

  • 使用翻滚窗口的apache flink应用程序遇到问题。窗口大小是10秒,我希望每隔10秒有一个resultSet数据流。然而,当最新窗口的结果集总是延迟时,除非我将更多数据推送到源流。 例如,如果我在“01:33:40.0”和“01:34:00.0”之间将多条记录推送到源流,然后停止查看日志,则不会发生任何事情。 我在“01:37:XX”上再次推送一些数据,然后将在“01:33:40.0”和“0

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想