当前位置: 首页 > 知识库问答 >
问题:

在Java-Spark中从DataFrame查找每天最大行程

越学义
2023-03-14

环境:Java1.8,VM Cloudera QuickStart。我有从csv文件到Hadoop hdfs的数据。每一行代表一条公共汽车路线。

    id      vendor    start_datetime   end_datetime    trip_duration_in_sec
    17534   A         1/1/2013 12:00   1/1/2013 12:14  840
    68346   A         1/1/2013 12:13   1/1/2013 12:18  300
    09967   B         1/1/2013 12:34   1/1/2013 12:39  300
    09967   B         1/1/2013 12:44   1/1/2013 12:51  420
    09967   A         1/1/2013 12:54   1/1/2013 12:56  120
   .........
   .........
1/1/2013 (Day 1) - Vendor A has 3 bus routes at 12:00-13:00 hour. (That time 12:00-13:00, vendor A had the most bus routes..)
1/1/2013 (Day 1) - Vendor B has 2 bus routes at 12:00-13:00 hour. (That time 12:00-13:00, vendor B had the most bus routes..)
....
import static org.apache.spark.sql.functions;
import static org.apache.spark.sql.Row;

Dataset<Row> ds;
ds.groupBy(functions.window(col("start_datetime"), "1 hour").count().show();

共有1个答案

华安民
2023-03-14

我对Java不太熟悉,所以我试着用Scala来解释它。

找出每个供应商每天最大路由的小时数的关键是按(vendor,day,hour)计数,然后按(vendor,day)聚合,计算每组最大cnt对应的小时数。每个记录的小时可以由start_datetime解析。

val df = spark.createDataset(Seq(
("17534","A","1/1/2013 12:00","1/1/2013 12:14",840),
("68346","A","1/1/2013 12:13","1/1/2013 12:18",300),
("09967","B","1/1/2013 12:34","1/1/2013 12:39",300),
("09967","B","1/1/2013 12:44","1/1/2013 12:51",420),
("09967","A","1/1/2013 12:54","1/1/2013 12:56",120)
)).toDF("id","vendor","start_datetime","end_datetime","trip_duration_in_sec")

df.rdd.map(t => {
    val vendor = t(1)
    val day = t(2).toString.split(" ")(0)
    val hour = t(2).toString.split(" ")(1).split(":")(0)
    ((vendor, day, hour), 1)
})
// count by key
.aggregateByKey(0)((x: Int, y: Int) =>x+y, (x: Int, y: Int) =>x+y) 
.map(t => {
    val ((vendor, day, hour), cnt) = t;
    ((vendor, day), (hour, cnt))
})
// solve the max cnt by key (vendor, day)
.foldByKey(("", 0))((z: (String, Int), i: (String, Int)) => if (i._2 > z._2) i else z)
.foreach(t => println(s"${t._1._2} - Vendor ${t._1._1} has ${t._2._2} bus routes from ${t._2._1}:00 hour."))
 类似资料:
  • 在一个14节点的Google Dataproc集群中,我有大约600万个名字,它们被两个不同的系统转换为ID:和。每个包含、和。我的目标是生成从到的映射,以便对于每个,对应的是附加到的所有名称中最常见的id。 让我们试着用一个例子来澄清一下。如果我有以下行: 我的目标是生成从到的映射。实际上,与关联的名称是、和,它们分别映射到、,因此是与关联的名称中最常见的映射。同样,将映射到。假设总会有赢家是可

  • 问题内容: 如何找到特定列的值 最大的行 ? 将为我提供每一列的最大值,我不知道如何获取对应的行。 问题答案: 使用熊猫功能。很简单: 或者,您也可以使用,例如-它提供相同的功能,并且至少与粗略观察中的显示速度一样快。 返回索引标签,而不是整数。 示例”:如果您将字符串值作为索引标签,例如行“ a”至“ e”,则可能想知道最大值出现在第4行(而不是“ d”行)。 如果您希望该标签在其中的整数位置,

  • 我试图找到矩阵中每列的最小值和最大值,但我当前的代码运行不正确。我试图把最小值放在一个新矩阵的第一行,最大值放在下一行,并对每一列这样做。任何帮助都将不胜感激,谢谢!

  • 我有一个RDD[标签点],我想找到标签的最小值和最大值,并应用一些转换,例如从所有这些标签中减去数字5。问题是我已经尝试了各种方法来获取标签,但没有任何工作正常。 如何仅访问 RDD 的标签和功能?有没有办法将它们作为列表[双精度]和列表[向量]例如? 我无法转到数据帧。

  • 我试图找出在Spark dataframe列中获得最大值的最佳方法。 考虑以下示例: 上面的每一个都给出了正确的答案,但在没有Spark分析工具的情况下,我无法判断哪一个是最好的。 就Spark运行时或资源使用而言,上述哪种方法最有效,或者是否有比上述方法更直接的方法,有任何来自直觉或经验主义的想法?