当前位置: 首页 > 知识库问答 >
问题:

在每个时间列传递时迭代Dataframe以执行转换

越霖
2023-03-14

我有一个包含100列和col名称的数据表,如col1、col2、col3....我想根据条件匹配对列的值应用某些转换。我可以将列名存储在字符串数组中。并在withColumn中传递数组的每个元素的值,并基于When条件i可以垂直转换列的值。但问题是,由于Dataframe是不可变的,所以每个更新的版本都需要存储在一个新的变量中,并且新的Dataframe需要传入withColumn以进行下一次迭代转换。是否有任何方法来创建数组的dataframe,以便新的dataframe可以存储为数组的一个元素,并可以根据迭代器的值进行迭代。或者有没有其他方法来处理同样的事情。

var arr_df : Array[DataFrame] = new Array[DataFrame](60)   

-->这将引发错误“未找到类型dataframe”

val df(0) = df1.union(df2)

for(i <- 1 to 99){
  val df(i) = df(i-1).withColumn(col(i), when(col(i)> 0, col(i) + 
   1).otherwise(col(i)))

scala> val original_df = Seq((1,2,3,4),(2,3,4,5),(3,4,5,6),(4,5,6,7),(5,6,7,8),(6,7,8,9)).toDF("col1","col2","col3","col4")
original_df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]

scala> original_df.show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   4|
|   2|   3|   4|   5|
|   3|   4|   5|   6|
|   4|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
+----+----+----+----+

共有1个答案

有品
2023-03-14

检查下面的代码。

scala> df.show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |4   |5   |
|3   |4   |5   |6   |
|4   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
+----+----+----+----+
scala>  val requiredColumns = df.columns.zipWithIndex.filter(_._2 < 3).map(_._1).toSet
requiredColumns: scala.collection.immutable.Set[String] = Set(col1, col2, col3)
scala> val allColumns = df.columns
allColumns: Array[String] = Array(col1, col2, col3, col4)
scala> val columnExpr = allColumns.filterNot(requiredColumns(_)).map(col(_)) ++ requiredColumns.map(c => when(col(c) > 3, col(c) + 1).otherwise(col(c)).as(c))
scala> df.select(columnExpr:_*).show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |5   |5   |
|3   |5   |6   |6   |
|5   |6   |7   |7   |
|6   |7   |8   |8   |
|7   |8   |9   |9   |
+----+----+----+----+
 类似资料:
  • 我想知道是否有更好的方法来查看Pyspark是否取得了进展(同时写入PL/SQL DB)。当前,当我的代码运行时,我看到的唯一输出是: 在做这一步的时候,很高兴看到pyspark的一些进展。

  • 我的模型逐渐变慢,速度不可接受(即从每秒200次滴答声到一次滴答声的几秒钟)。我想了解这个问题的原因。最简单的方法是什么来检查模型的哪个部分越来越耗费时间?我以前试过使用其他java探查器,但不好理解。

  • 我正在努力改进我写的一个数据传输程序。我在寻找如何让它更快的建议。我的程序通过填充ResultSet并将结果写入文件来从数据库(通常是Oracle11g)中提取数据。该程序定期查看表,并查询某个特殊列是否发生了更改。例如,这可能是这样一个查询:

  • 问题 你想同时迭代多个序列,每次分别从一个序列中取一个元素。 解决方案 为了同时迭代多个序列,使用 zip() 函数。比如: >>> xpts = [1, 5, 4, 2, 10, 7] >>> ypts = [101, 78, 37, 15, 62, 99] >>> for x, y in zip(xpts, ypts): ... print(x,y) ... 1 101 5 78 4

  • 问题内容: 我有一个列表,我想做的是嵌套循环 所需结果 我得到的结果 我希望将 foo 和 col中 的列表项一一注入到上面的shell脚本中。有没有一种方法可以将两个列表项一次传递到上面的shell脚本中? 我们可以做点什么 或使用for循环 引用我的Jenkinsfile }} 问题答案: 我相信转置是您要使用的方法,将两个列表配对,然后可以遍历结果: 更新 : 这就是我的目标。请注意,为简洁

  • 问题内容: 跟踪任务在gradle构建脚本中花费了多长时间的最优雅的方法是什么?在最佳情况下,将时间直接记录在任务名称的同一行或下一行 问题答案: 最干净的解决方案是实现TaskExecutionListener(我确定您可以处理该部分)并向进行注册。