当前位置: 首页 > 知识库问答 >
问题:

Flink DataStream-在窗口上执行SQL查询,执行orderBy

唐高朗
2023-03-14
SELECT name, age, sum(days), avg(salary)
FROM employees
WHERE age > 25
GROUP BY name, age
ORDER BY name, age

下面是目前为止我所想到的伪代码。任何帮助都将不胜感激!谢谢!

employeesStream
.filter(new FilterFunction() ....)    \\ where clause
.keyby(nameIndex, ageIndex)           \\ group by??
.timeWindow(Time.seconds(10), Time.seconds(1))
.apply(new WindowFunction() ....)     \\ calculate average (and sum?)
// order by??

我检查了Table API,但对于流,似乎不支持很多操作,例如OrderBy。

共有1个答案

陶鸿畴
2023-03-14

流式订购不是一件微不足道的事。你想如何对永无止境的东西进行排序?在您的示例中,您希望计算一个平均值或一个和值,这只是每个窗口的一个值。不能对一个值进行排序。

另一种可能是缓冲所有值,等待一个完整性指示器开始排序。由于事件时间和水印,如果您知道您看到了直到某个时间的所有值(也称为水印),就可以对流进行排序。

事件时排序是最近引入的,它将成为Flink1.4表API的一部分。请参阅此处的示例。

 类似资料:
  • Query 也可以直接执行一个SQL查询,即Select命令。在Postgres中支持原始SQL语句中使用 ` 和 ? 符号。 sql := "select * from userinfo" results, err := engine.Query(sql) 当调用 Query 时,第一个返回值 results 为 []map[string][]byte 的形式。 Query 的参数也允许传

  • 大家好,我在joomla 2.5中为后端做了一个组件,但是我在执行sql查询时遇到了问题,我的变量是空的,所以它不会显示任何内容。 我有其他的文件和文件,但这里对我的问题很重要。 首先在我的controller.php我有这个内部管理文件 在我的模型文件我有restaurante.php 在我的控制器文件里我有这个 最后,在我的视图文件中,我有一个默认的tmpl文件。显示表格的php 但是元素re

  • 当用户按下ctrl键时,我试图在Chrome中保存HTML文件,但Chrome崩溃了。 (我只想下载HTML文件的源代码) 我读到它发生是因为我的文件比1.99M大... 在第一次尝试中(在我知道Chrome崩溃之前): 第二次尝试,在我读到崩溃后,我使用了: 这里我得到了错误: 我不知道,但我读到我需要将我的字符串编码为base 64:如何在JavaScript中将字符串编码为Base64? 有

  • 问题内容: 所涉及的rdms是oracle错误是 我想用这个查询做的是: 阈值表包含列阈值类型,其中包含现金交易表的列名称 对于阈值表中的每个记录,我们需要根据现金交易表中的阈值类型来比较总和(金额)组。 并将获取的数量与阈值表的threshold_amount比较 我需要选择threshold_id 阈值表: 现金交易表: 期望的输出: 让我们进行第一个提取:阈值表中的第一个记录 1.现在thr

  • 问题内容: 我有5个单独的SQL查询,这些查询按顺序在控制器操作中执行。这是我用来执行它们的方法: 因此,基本上我有五个连续的查询,这些查询必须按顺序执行,并且代码必须在它们下面继续执行。有没有更好的方法可以从控制器操作内部执行查询?我不确定使用当前方法进行错误处理的最佳方法。 谢谢! 问题答案: 首先看在控制器和数据库之间创建一个抽象层。存储库就是一个例子,它会在您进行测试时提供帮助。您可以创建

  • 问题内容: 我有一个名为“ df”的熊猫数据集。 我该如何做以下事情? 谢谢。 对于那些了解R的人,有一个名为sqldf的库,您可以在R中执行SQL代码,我的问题基本上是,是否在python中有像sqldf这样的库 问题答案: 这不应该做,您可以看一下包(与R中的一样) 更新2020-07-10 更新