我在Scala中的DataFrame
中转置值时遇到问题。我的初始DataFrame
如下所示:
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| X| 6|null|
| B| Z|null| 5|
| C| Y| 4|null|
+----+----+----+----+
col1
和col2
是类型字符串
和
col3
,和col4
是Int
。
结果应该是这样的:
+----+----+----+----+------+------+------+
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4|
+----+----+----+----+------+------+------+
| A| X| 6|null| 6| null| null|
| B| Z|null| 5| null| 5| null|
| C| Y| 4| 4| null| null| 4|
+----+----+----+----+------+------+------+
这意味着三个新列应以
col1
、col2
和提取值的列命名。提取的值来自列col2
、col3
或col5
取决于哪个值不是null
。
那么如何实现这一点呢?我首先想到的是这样一个< code>UDF
:
def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = {
if col3 == null{
val rowValue=col4;
val newColumn=col1+col2+"col4";
} else{
val rowValue=col3;
val newColumn=col1+col2+"col3";
}
return (newColumn, rowValue);
}
val udfMyFunc = udf(myFunc _ ) //needed to treat it as partially applied function
但我如何从数据帧以正确的方式调用它?
当然,上面的所有代码都是垃圾,还有更好的方法。因为我只是在处理第一个代码片段,所以让我知道…比较<code>Int已经不起作用了。
感谢任何帮助!谢谢!
好的,我有一个解决方法来实现我想要的东西。我执行以下操作:
(1)我使用< code>[newColumnName,rowValue]生成包含元组的新列。按照这个建议,从Spark数据帧中的单个列派生多个列
case class toTuple(newColumnName: String, rowValue: String)
def createTuple (input1:String, input2:String) : toTuple = {
//do something fancy here
var column:String= input1 + input2
var value:String= input1
return toTuple(column, value)
}
val UdfCreateTuple = udf(createTuple _)
(2) 将函数应用于数据帧
dfNew= df.select($"*", UdfCreateTuple($"col1",$"col2").alias("tmpCol")
(3) 创建具有不同值 new列名
的数组
val dfDistinct = dfNew.select($"tmpCol.newColumnName").distinct
(4)创建具有不同值的数组
var a = dfDistinct.select($"newCol").rdd.map(r => r(0).asInstanceOf[String])
var arrDistinct = a.map(a => a).collect()
(5)创建键值映射
var seqMapping:Seq[(String,String)]=Seq()
for (i <- arrDistinct){
seqMapping :+= (i,i)
}
(6) 将映射应用于原始数据帧,参见基于另一列将值映射到特定列
val exprsDistinct = seqMapping.map { case (key, target) =>
when($"tmpCol.newColumnName" === key, $"tmpCol.rowValue").alias(target) }
val dfFinal = dfNew.select($"*" +: exprsDistinct: _*)
好吧,这有点麻烦,但我可以在不知道有多少列的情况下导出一组新列,同时将值转移到新列中。
有一种更简单的方法:
val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1
.withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2
.groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3
.pivot("newCol") // Step 4
.agg(max($"value")) // Step 5
.orderBy($"newCol") // Step 6
.drop($"newCol") // Step 7
df3.show()
步骤如下:
感谢@user8371915,他首先帮助我回答了自己的支点问题。
结果如下:
+----+----+----+----+----+----+----+
|col1|col2|col3|col4| AX| BZ| CY|
+----+----+----+----+----+----+----+
| A| X| 6|null| 6|null|null|
| B| Z|null| 5|null| 5|null|
| C| Y| 4| 4|null|null| 4|
+----+----+----+----+----+----+----+
您可能需要尝试将列标题字符串串联起来,以获得正确的结果。
我有一个如下所示的数据框架 我写了一个UDF来将分类转换为二进制和 我将此应用于数据帧,如下所示 如何将多个列传递到 UDF 中,这样我就不必对其他分类列重复自己?
现在,我想在一个函数中使用这个,如下所示- 然后使用此函数在我的DataFrame中创建一个新列 总之,我希望我的列“new_col”是一个类型数组,其值为[[x,x,x]] 我得到以下错误。我在这里做错了什么? 原因:java.lang.UnsupportedOperationException:不支持org.apache.spark.sql.Column类型的模式
下面有以下dataframe架构
我想根据Scala映射所表示的配置来转换数据帧中的一些列。 我有 2 个案例: < li >接收映射< code>Map[String,Seq[String]]和列col1、col2,以转换col3(如果在key = col1的映射中存在实体,并且col2在此实体值列表中)。 < li >接收map 和col1,col2,以在Map中存在具有key = col1的实体并且col2在由Long元组描
您可以从以下链接下载生成的.json示例:https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json? 特别是,您可以看到其中每一个中的实际数据实际上都是一个字典:我们感兴趣的“features”列的形式如下:
我正在使用pyspark,用spark-csv将一个大型csv文件加载到dataframe中,作为预处理步骤,我需要对其中一列(包含json字符串)中的可用数据应用各种操作。它将返回X个值,每个值都需要存储在它们自己单独的列中。 该功能将在UDF中实现。但是,我不确定如何从该UDF返回一个值列表,并将其输入到各个列中。下面是一个简单的例子: 生成以下内容: