我使用的是sparkSql 1.6.2(Java API),我必须处理下面的DataFrame,其中包含两列中的值列表:
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
所需的表为:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
我想我必须使用爆炸函数和自定义UDF函数的组合。
UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
return col1.apply(0) + col2.apply(0);
}
};
context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
StructType retSchema = new StructType(new StructField[]{
new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
}
);
register(“combineUDF”,combineUDF,retSchema);
任何帮助都将非常感谢。
更新:我试图首先实现zip(AttributeName,AttributeValue),所以接下来我只需要在SparkSQL中应用标准的爆炸函数:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
List<List<String>> zipped = new LinkedList<>();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
zipped.add(subRow);
}
return zipped;
}
};
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
有人帮忙吗?
最后,我设法得到了我想要的结果,但可能不是以最有效的方式。
基本上是两步:
UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
ArrayList zipped = new ArrayList();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
String subRow = col1.apply(i) + ";" + col2.apply(i);
zipped.add(subRow);
}
return scala.collection.JavaConversions.asScalaBuffer(zipped);
}
};
sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);
DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
然后调用以下lambda函数将列表分解为行:
DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));
在这个阶段,df3看起来如下所示:
ID AttName_AttValue
0 [an1,av1]
0 [an1,av2]
0 [an3,av3]
1 [bn1,bv1]
1 [bn2,bv2]
最后,为了将属性名称和值拆分为两个不同的列,我将DataFrame转换为JavaDD,以便使用map函数:
JavaRDD df3RDD = df3.toJavaRDD().map(
(Function<Row, Row>) myRow -> {
String[] info = String.valueOf(myRow.get(1)).split(",");
return RowFactory.create(myRow.get(0), info[0], info[1]);
}).cache();
火花UDF是否可能返回多个值?如果是这样,如何在数据框架API中访问各个项目。
下面是一个提供两列结果的函数。 在这个函数中,有一个用于返回结果。 功能: 有没有可能不使用循环就返回行? 如果是这样的话,请与我分享我们如何做到这一点。 我是否能够编写一个函数,在不使用循环的情况下将记录插入到表中? 帮我解决这个问题。 提前谢谢。
问题内容: 我有一个熊猫DataFrame ,。它包含一列“大小”,以字节为单位表示大小。我已经使用以下代码计算了KB,MB和GB: 我已经运行了超过120,000行,并且根据%timeit,每列花费的时间约为2.97秒* 3 =〜9秒。 无论如何,我可以使它更快吗?例如,我是否可以代替一次套用并运行3次而不是一次返回一列,而是可以一次通过返回所有三列以将其插入回原始数据帧吗? 我发现的其他问题都
我用的是spark 2.1,脚本是pyspark。请帮我一下,因为我被困在这里了。 问题陈述:根据多列的条件创建新列 输入<code>数据帧<code>如下 现在我需要创建一个新列作为FLG,我的条件是如果
5.3. 多返回值 在Go中,一个函数可以返回多个值。我们已经在之前例子中看到,许多标准库中的函数返回2个值,一个是期望得到的返回值,另一个是函数出错时的错误信息。下面的例子会展示如何编写多返回值的函数。 下面的程序是findlinks的改进版本。修改后的findlinks可以自己发起HTTP请求,这样我们就不必再运行fetch。因为HTTP请求和解析操作可能会失败,因此findlinks声明了2
我有一个返回dict对象的函数,我想利用pandas/numpy在数据帧的每一行上为该函数执行列操作/向量化的能力。函数的输入在dataframe中指定,我希望函数的输出成为现有dataframe上的新列。下面是一个例子。 期望输出: 我读了这个答案,大部分内容都是这样的,但是当函数返回一个dict对象,其中包含所需的列名作为dict中的键时,我不太明白该怎么做。