当前位置: 首页 > 知识库问答 >
问题:

ApacheFlink:如何使用表API对每n行进行分组?

戚森
2023-03-14

最近我尝试使用ApacheFlink进行快速批处理。我有一个表,它有一个列:value和一个不相关的索引

基本上我想计算每5行值的平均值和范围。然后我将根据我刚才计算的平均值计算平均值和标准偏差。所以我想最好的方法是使用翻滚窗口。

看起来是这样的

DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
            .window(Tumble.over("5.rows").on({what should I write?}).as("w")
            .groupBy("w")
            .select("f0.avg, f0.max-f0.min");

{The next step is to use groupedTable to calculate overall mean and stdDev} 

但是我不知道用写什么。on()。我试过“proctime”,但它说没有这样的输入。我只希望它在从源代码读取时按顺序分组。但是它必须是一个时间属性,所以我不能使用索引列作为排序。

我是否必须添加时间戳才能执行此操作?在批处理过程中是否有必要这样做,是否会降低计算速度?解决这个问题的最好办法是什么?

更新:我试图在表API中使用滑动窗口,它让我异常。

// Calculate mean value in each group
    Table groupedTable = table
            .groupBy("f0")
            .select("f0.cast(LONG) as groupNum, f1.avg as avg")
            .orderBy("groupNum");

//Calculate moving range of group Mean using sliding window
    Table movingRangeTable = groupedTable
            .window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
            .groupBy("w")
            .select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");

例外是:

线程“main”java中出现异常。lang.UnsupportedOperationException:当前不支持在事件时间上计数滑动组窗口。

在org。阿帕奇。Flink。桌子计划节点。数据集。数据集设置为Waggreegate。createEventTimeSlidingWindowDataSet(DataSetWebindowaggregate.scala:456)

org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToplan(DataSetWindowAggregate.scala:139)

...

这是否意味着表API不支持滑动窗口?如果我没记错的话,DataSet API中没有窗口函数。那么如何计算批处理中的移动范围呢?

共有1个答案

华宇
2023-03-14

window子句用于定义基于窗口函数的分组,例如翻滚会话。表API(或SQL)中没有很好地定义每5行分组一次,除非指定行的顺序。这是在翻滚函数的on子句中完成的。由于此功能源自流处理,因此on子句需要一个timestamp属性。

您可以使用currentTimestamp()函数获取当前时间的时间戳。然而,我应该指出,Flink将对数据进行排序,因为它不知道函数的单调性。此外,所有这些都将使用1的并行性,因为没有允许分区的子句。

或者,您也可以实现用户定义的标量函数,该函数将index属性转换为时间戳(实际上是Long值)。但是同样,Flink将对数据进行完整的排序。

 类似资料:
  • 我想知道有没有人能帮我把一个较长的行分成几个较短的行,然后再把它们拆开? 在本例中,我有12列长的行,我希望将其分成更多的行和4列(请参阅stack_df)。 总体计划是按行合并()所有列,并将()突变为一列(9行x 1列,请参阅merge_df)。 之后,我希望将它们解压回一个大小为3行3列的数据帧(请参阅simple_df) 第3部分:解叠行(3行,3列)。(所需输出)

  • 问题内容: 所以我有这张桌子。它有几百行。每行中都有一个日期时间字段。我需要完成的是获取给定时间段内有多少行,而不是整个时间段,而是该时间段的每一天。到此为止,我知道该怎么办。但是,此外,我还需要在表中没有值0的日期的行。 因此,例如: 应该给我这样的结果: 任何人都可以帮忙吗? 问题答案: 为了处理带有0条对应记录的日期,我的常规做法是使用日历表进行联接。 例如,创建一个表,其中一个字段称为,并

  • 问题内容: 我阅读了Firestore文档以及Internet(stackoverflow)上有关Firestore分页的所有文章,但没有运气。我试图在文档中实现确切的代码,但是什么也没有发生。我有一个包含项目的基本数据库(超过1250个或更多),我想逐步获取它们。通过滚动以加载15个项目(到数据库中的最后一个项目)。 如果使用文档代码: 怎么做?文档没有太多细节。 PS:当用户滚动时,我需要使用

  • 我有一个object of days数组,我想对每个数组项进行单独的HTTP调用。但条件是当我得到前一个项目的成功响应时,只有我想做下一个项目的API请求,直到我想等待。假设我的数组中有5个项,所以它将立即调用API。我希望每个项目的API请求顺序。意味着当我的第一个items API请求响应为真时,第二个items API应该调用。 上面是显示API请求的片段。但我不明白如何检查我以前的响应&然

  • 问题内容: 所以,我有一张表,上面有这样的行: 每次扫描警报时(即每次触发或清除警报时)都会添加“已扫描的警报”行。任何警报都会添加带有特定Ev_Custom1的行。第一列Ev_Message包含一个计算机ID,该ID使我可以将警报与其他计算机分开。(您不喜欢任意的列名吗?)超过900条独特的警报消息。 我希望查询返回的内容是这样的: 这将是两个日期之间过滤的查询。我可以更改进入表的数据,但是有9

  • 我有一个这样的方法: 此方法需要以如下字符串形式返回3个最昂贵项目的产品ID:“item1,item2,item3”。我应该只能使用溪流,我被困在这里了。我应该能够按值对项目进行排序,然后获得产品ID,但我似乎无法使其正常工作。 编辑: 产品ID位于入口类中