当前位置: 首页 > 知识库问答 >
问题:

返回多列的Spark-Java UDF

嵇浩淼
2023-03-14

我使用的是sparkSql 1.6.2(Java API),我必须处理下面的DataFrame,其中包含两列中的值列表:

ID  AttributeName AttributeValue
 0  [an1,an2,an3] [av1,av2,av3]
 1  [bn1,bn2]     [bv1,bv2]

所需的表为:

ID  AttributeName AttributeValue
 0  an1           av1
 0  an2           av2
 0  an3           av3
 1  bn1           bv1
 1  bn2           bv2

我想我必须使用爆炸函数和自定义UDF函数的组合。

    null
 UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
        public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            return col1.apply(0) + col2.apply(0);
        }
    };

 context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
StructType retSchema = new StructType(new StructField[]{
            new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
            new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
        }
    );

register(“combineUDF”,combineUDF,retSchema);

任何帮助都将非常感谢。

更新:我试图首先实现zip(AttributeName,AttributeValue),所以接下来我只需要在SparkSQL中应用标准的爆炸函数:

ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
        public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            List<List<String>> zipped = new LinkedList<>();

            for (int i = 0, listSize = col1.size(); i < listSize; i++) {
                List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
                zipped.add(subRow);
            }

            return zipped;
        }

    };
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);

有人帮忙吗?

共有1个答案

古彦
2023-03-14

最后,我设法得到了我想要的结果,但可能不是以最有效的方式。

基本上是两步:

  • 两个列表的zip
  • 按行分解列表
UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
        ArrayList zipped = new ArrayList();

        for (int i = 0, listSize = col1.size(); i < listSize; i++) {
            String subRow = col1.apply(i) + ";" + col2.apply(i);
            zipped.add(subRow);
        }

        return scala.collection.JavaConversions.asScalaBuffer(zipped);
    }

};
sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);
DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));
ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

然后调用以下lambda函数将列表分解为行:

 DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));

在这个阶段,df3看起来如下所示:

ID  AttName_AttValue
 0  [an1,av1]
 0  [an1,av2]
 0  [an3,av3]
 1  [bn1,bv1]
 1  [bn2,bv2]

最后,为了将属性名称和值拆分为两个不同的列,我将DataFrame转换为JavaDD,以便使用map函数:

JavaRDD df3RDD = df3.toJavaRDD().map(
            (Function<Row, Row>) myRow -> {
                String[] info = String.valueOf(myRow.get(1)).split(",");
                return RowFactory.create(myRow.get(0), info[0], info[1]);
        }).cache();
 类似资料:
  • 火花UDF是否可能返回多个值?如果是这样,如何在数据框架API中访问各个项目。

  • 下面是一个提供两列结果的函数。 在这个函数中,有一个用于返回结果。 功能: 有没有可能不使用循环就返回行? 如果是这样的话,请与我分享我们如何做到这一点。 我是否能够编写一个函数,在不使用循环的情况下将记录插入到表中? 帮我解决这个问题。 提前谢谢。

  • 问题内容: 我有一个熊猫DataFrame ,。它包含一列“大小”,以字节为单位表示大小。我已经使用以下代码计算了KB,MB和GB: 我已经运行了超过120,000行,并且根据%timeit,每列花费的时间约为2.97秒* 3 =〜9秒。 无论如何,我可以使它更快吗?例如,我是否可以代替一次套用并运行3次而不是一次返回一列,而是可以一次通过返回所有三列以将其插入回原始数据帧吗? 我发现的其他问题都

  • 我用的是spark 2.1,脚本是pyspark。请帮我一下,因为我被困在这里了。 问题陈述:根据多列的条件创建新列 输入<code>数据帧<code>如下 现在我需要创建一个新列作为FLG,我的条件是如果

  • 5.3. 多返回值 在Go中,一个函数可以返回多个值。我们已经在之前例子中看到,许多标准库中的函数返回2个值,一个是期望得到的返回值,另一个是函数出错时的错误信息。下面的例子会展示如何编写多返回值的函数。 下面的程序是findlinks的改进版本。修改后的findlinks可以自己发起HTTP请求,这样我们就不必再运行fetch。因为HTTP请求和解析操作可能会失败,因此findlinks声明了2

  • 我有一个返回dict对象的函数,我想利用pandas/numpy在数据帧的每一行上为该函数执行列操作/向量化的能力。函数的输入在dataframe中指定,我希望函数的输出成为现有dataframe上的新列。下面是一个例子。 期望输出: 我读了这个答案,大部分内容都是这样的,但是当函数返回一个dict对象,其中包含所需的列名作为dict中的键时,我不太明白该怎么做。