当前位置: 首页 > 知识库问答 >
问题:

Flink-如何同时计算求和和和平均值?

敖涵容
2023-03-14

Flink(批处理/流式处理)中是否有方法同时计算字段的平均值和总和?使用聚合方法,我可以计算groupBy结果中字段的和,但如何同时计算平均值呢?下面的示例代码。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String,Integer,Double>> source = 
         env.readCsvFile(PathConfig.LINEITEM_1)
         .fieldDelimiter("|")
         types(String.class, Integer.class, Double.class);

source.groupBy(0,1).aggregate(Aggregations.SUM, 2);
//average of field 2???

共有1个答案

淳于嘉树
2023-03-14

对于CSV解析、分组和聚合等简单任务,我建议使用Flink的Table API。

如果您想使用更多的低级API,可以实现一个GroupReduce函数,该函数进行求和/计数(直到迭代器没有更多的元素),并在最后生成最终平均值。

 类似资料:
  • 我在添加数组的所有元素以及求取它们的平均值时遇到了问题。我将如何做到这一点并用我当前拥有的代码实现它?这些元素应该定义如下。

  • 我想用基于历史事件的流计算Flink中基于窗口的平均值(或我定义的任何其他函数),因此流必须是事件时间(而不是基于处理时间): 我已经了解了如何在摄入时添加时间戳: 但是当我进行计算(应用函数)时,当我只是以与没有EventTime时相同的方式进行计算时,它就不起作用了。我读过一些关于我必须设置的水印的东西: 有没有人举一个简单的Scala例子? 尊敬的安德烈亚斯

  • 问题内容: 我在添加数组的所有元素以及将它们取平均值时遇到了问题。我该怎么做,并用我现在拥有的代码实现它?这些元素应该定义如下。 问题答案: var sum = 0; for( var i = 0; i < elmt.length; i++ ){ sum += parseInt( elmt[i], 10 ); //don’t forget to add the base } 只需遍历数组,因为您的

  • 问题内容: 作为输入,我有一个带时间的CSV文件,每次都有一串数字。 我想输出按小时平均和总和分组的每小时表格: 到目前为止,我一直在看用字典来完成它,其中小时是一个关键,值是一个计数和总和的列表,然后将总和除以计数就可以得到平均值。我敢肯定,必须有一种更清洁的方法来做到这一点。也许有些图书馆可以使用它。有什么建议? 问题答案: 一个熊猫的解决方案: 印刷品: 另存为csv文件: 这是以下内容:

  • 是否可以对流进行求和、平均并将其转换为新对象。我有个目标 现在我想得到这个对象列表的平均值和总和(代码总和价格和代码平均价格) 然后我想创建一个新对象(页脚 这就是我现在所拥有的,它可以工作,但是我要通过两次流。我想要一个方法,我可以通过一次流来做到这一点。 有没有更好的方法做到这一点而不必重复这一点。谢谢

  • 我想画出每种语言的总和和平均数。我不确定我的代码是否符合我的要求。我在画这张图时出错了。我不知道我在哪里把编码搞砸了。我需要一些帮助。 这是错误消息