当前位置: 首页 > 知识库问答 >
问题:

如何使用 Scala 在 Apache Flink 中执行平均操作

程招
2023-03-14

我有一个这样的数据集

15,Rom,36,49
16,Weyoun,22,323
17,Odo,35,13
18,Jean-Luc,45,455

我想选择第3列和第4列作为我的键和值,我如何在Apache Flink中执行平均操作。

我最多能做到“按键分组”。但是我无法对每个键的值执行平均运算。

val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv")

val jn = lines.map(line => line.split(",")).map(word => (word(2).toString,word(3).toInt)).groupBy("0")

共有2个答案

石喜
2023-03-14

这应该有效

val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv")

val jn = lines.map(line => line.split(",")).map(word => (word(2).toString, 1,word(3).toDouble)).groupBy(0).reduce { 
    (left, right) => 
      val (key, left1, left2) = left
      val (_, right1, right2) = right
      (key, left1 + left2, right1 + right2)
}.map(tuple => (tuple._1, tuple._3 / tuple._2))
董永宁
2023-03-14

请注意,我也更改了map。它现在发出3元组:

scala prettyprint-override">val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv")

val jn = lines
  .map(line => line.split(","))
  .map(word => (word(2).toString,word(3).toInt,1))
  .groupBy("0")
  .reduce { (left, right) => (left._1, left._2 + right._2, left._3 + right._3) }
  .map(tuple => (tuple._1, tuple._2 / tuple._3))
 类似资料:
  • 我正在尝试计算Flink中输入数据流(无窗口)的平均值 输入数据流来自套接字连接,形式为“键值”,如“x 5”

  • 问题内容: 我使用的是在 GridFS中 存储了一个PDF文件( 大小 为 30mb) 。我能够轻松地执行插入,删除和查找操作。 数据在坚持和收藏。现在我要 更新 : 案例1:pdf文件 情况2:标题或作者 如何在GridFS中对 案例1 执行这些更新操作? 我知道我需要维护文件的多个版本并选择正确的版本。有人可以澄清一下吗? 编辑 : 我可以轻松更新元数据(标题,作者)。 问题答案: 在Grid

  • 问题内容: 我有下面的表格 我需要如下计算移动平均线 我尝试了什么 起初看起来很简单,但是当表上的数据膨胀时,它太慢了 有更快的方法吗? 问题答案: 您的查询是进行移动平均的一种方法: 替代方法是使用变量: 索引应进一步提高性能。

  • 问题内容: 如何在MySQL中的日期之间取平均值?我对时间值,小时和分钟更感兴趣。 在具有以下内容的桌子上: 进行如下查询: 编辑: 的作品,但我不知道它是什么返回数据。 问题答案: 这似乎有点骇人听闻,但适用于1970年〜1970年和2030年之间的日期(在32位元弓上)。您实际上是在将日期时间值转换为整数,对其求平均,然后将平均值转换回日期时间值。 可能有更好的解决方案,但这会帮助您紧要关头。

  • 问题内容: 我是SSH和JSch的新手。当我从客户端连接到服务器时,我想做两个任务: 上传文件(使用) 执行命令,例如创建目录,以及搜索MySQL数据库 目前,我正在使用两个单独的Shell登录名来执行每个任务(实际上我还没有开始对MySQL查询进行编程)。 对于上传,相关代码为 而对于我的命令 我应该在第一个频道之后断开会话,然后再打开第二个频道吗?还是完全关闭会话并打开一个新会话?如我所说,我

  • 问题内容: 我正在尝试从数据集中返回总计/平均值行,其中包含某些字段的SUM和其他字段的AVG。 我可以通过以下方式在SQL中执行此操作: 我将其转换为SQLAlchemy的尝试如下: 但这是错误的: 问题答案: 您应该使用类似: 您不能在这里使用,因为SqlAlchemy试图找到一个将函数结果放入的字段,但是它失败了。