当前位置: 首页 > 知识库问答 >
问题:

如何在flink中将int列聚合到数组?

夔波
2023-03-14

我正在探索一种方法来实现这一点,就像下面的SQL一样。

SELECT a_tag,NEST(type) AS type_arr FROM a GROUP BY a_tag

NEST()是一个用户定义的函数int聚合到数组

但是我无法更改输出类型,因为UDF类扩展了AggramFunction

有什么建议吗?谢谢。


共有1个答案

夹谷苗宣
2023-03-14

Flink提供了一个内置的聚合函数,称为Collection(),whitch可以处理这种情况。

请注意,collect()的结果类型将是MULTISET

 类似资料:
  • 我有一个表示为的自定义状态计算,当我的看到来自Kafka的新事件时,它将不断更新。现在,每次更新状态时,我都希望将更新后的状态打印到stdout。想知道怎么在Flink中做到这一点吗?与所有的窗口和触发器操作很少混淆,我一直得到以下错误。 我只想知道如何将我的聚合流打印到stdout或写回另一个kafka主题? 下面是引发错误的代码片段。

  • 问题内容: 我只是在学习MySQL-是否有组合(或嵌套)聚合函数的方法? 给定一个查询: 这将给我每个用户回答的问题数量。我真正想要的是每个用户回答的平均问题数量…… 计算此统计信息的正确方法是什么? 如果有可能,是否有办法针对每个问题分解此统计信息?(用户可以多次回答相同的问题)。就像是: 问题答案: 您必须使用子查询: 您不能将一个聚合与另一个聚合一起包装。如果MySQL支持分析/排序/窗口功

  • 我在scala中有一个火花数据框,例如: URL列的数据范围很广,但浏览器列的数据有限。我希望在URL列上进行聚合,并在一个列表中以降序获得每个浏览器的最高计数,如下所示: 我一直在编写SQL,以使用窗口分区将计数作为每个浏览器的一个条目,但无法将其放入列表中。 这是一个运行Spark 2.4和Scala 2.11的google数据处理集群

  • 我正在尝试按照此处的步骤创建一个基本的 Flink 聚合 UDF。我已经添加了依赖项()并实现了 我已经实现了强制方法和其他一些方法:< code>accumulate,merge等。所有这些构建都没有错误。根据文件,我应该可以注册为 但是,似乎只需要作为输入。我收到一个不兼容的类型错误: 任何帮助都会很好。

  • 我在一个Apache Flink项目中遇到了以下情况。 具有不同对象的3个流,例如 Person->字符串id,字符串firstName,字符串lastName(即101,John,Doe) PersonDetail->字符串id,字符串地址,字符串城市,字符串电话号码,long personId(即99,Stefansplatz 1,+43066012345678,101) PersonAddD