当前位置: 首页 > 知识库问答 >
问题:

如何在不指定每一列的情况下将整行作为参数传递给Spark(Java)中的UDF?

巩枫
2023-03-14

我有这个java代码,其中火花UDF将行作为输入并返回行。还有一个广播变量,它是哈希映射。

UDF所做的就是检查广播HashMap是否包含rowKey,如果包含,则返回一个新行,其中包含来自输入行的一些现有值和来自广播HashMap的一些更新值。如果没有,则按原样返回输入行。我这样做是因为我想根据HashMap中的值更新行列值。这是代码:

广播变量

final Broadcast<HashMap<String, HashMap<String, String>>> broadcastVariable = jsc.broadcast(someHashMap);

UDF定义

UDF1<Row, Row> myUDF = new UDF1<Row, Row> () {
    @Override
    public Row call(Row inputRow) {

        String myKey = inputRow.getString(3);

        if (broadcastVariable.value().containsKey(myKey)){
            Map<String, String> valuesToUpdate = broadcastVariable.value().get(myKey);

            String col1 = inputRow.getString(0);
            String col2 = inputRow.getString(1);
            String col3 = inputRow.getString(2);

            for (Map.Entry<String, String> entry : valuesToUpdate.entrySet())
            {
                String columnName = entry.getKey();

                switch(columnName) {
                case "col1" :
                    col1 = entry.getValue();
                    break;
                case "col2" :
                    col2 = entry.getValue();
                    break;
                case "col3" :
                    col3 = entry.getValue();
                    break;
                }
            }
            return RowFactory.create(col1,col2,col3,myKey);

        }
        return inputRow;
    }
};

UDF注册

hiveContext.udf().register("myUDF", myUDF, DataTypes.createStructType(DF1.schema().fields()));

UDF来电

DataFrame DF2 = DF1.select(org.apache.spark.sql.functions.callUDF
                ("myUDF", org.apache.spark.sql.functions.struct(DF1.col("col1"),
                        DF1.col("col2"),
                        DF1.col("col3"),
                        DF1.col("myKey"))));

我有以下问题,

>

  • 如何将数据帧中的所有列传递给 UDF 而不逐个列出它们?我问这个问题的原因是实际的数据帧有超过50列。我看到了这个例子,但无法让它在Java中工作。

    有没有办法在UDF中按名称访问行列?现在我正在使用字符串(int)。

    UDF输出是一个名为myUDF的结构(struct(col1,col2,col3,myKey))。它变得很长,有50列。我怎么给它起别名呢?

    任何帮助是值得赞赏的!

  • 共有2个答案

    屈宏爽
    2023-03-14

    不需要提前知道列名!

    您可以将行类型作为udf的参数之一。例如:

    import org.apache.spark.sql.functions._
    
    val myUdf = udf((row: Row) => <here comes the code inside your udf>)
    

    您可以像这样调用udf:

    df。withColumn(newColumnName,myUdf(struct(df.columns映射col:_*))

    然后,您可以访问udf中的dataframe行(结构和数据),以获取所需的任何信息,例如,将该行转换为(column_name)的映射-

    val myUdf = udf((行: Row) =

    罗和煦
    2023-03-14

    TL;DR使用数据集。映射(并将UDF替换为map函数)。

    如何将dataframe中的所有列传递到UDF,而不一一列出它们?

    dataframe.schema.fieldNames
    

    请参见数据集API。

    有没有办法在UDF中按名称访问行列?

    引用Row.fieldIndex的大话:

    fieldIndex(name: String): Int返回给定字段名的索引。

    并使用索引

    它变得很长,有50列。我怎么给它起别名呢?

    看起来您的代码将从一些重构和组合中受益。在单个管道中使用 50 个字段可能会变得有点笨拙。

     类似资料:
    • 我正试图将dataframe列作为参数传递 但是得到错误 :33:错误:类型不匹配; 找到:org.apache.spark.sql.column 必需:int val df_new=df.withcolumn(“age_category”,ageclassification.agecategory(df(“age”))

    • 问题内容: 我试图将两个包含整数的列表作为参数传递给python代码。但是将参数作为字符串列表获取。 输入看起来像 我发现以下黑客可以转换列表。 有一个更好的方法吗? 问题答案: 命令行参数始终以字符串形式传递。您将需要自己将它们解析为所需的数据类型。 有像argparse和click这样的库,可以让您定义自己的参数类型转换,但将其视为相同, 因此我怀疑它是否有用。 编辑Jan 2019 这个答案

    • 问题内容: 我已经熟悉Android框架和Java,并希望创建一个通用的“ NetworkHelper”类,该类可以处理大多数联网代码,使我能够从中调用网页。 我遵循了来自developer.android.com的这篇文章来创建我的网络类:http : //developer.android.com/training/basics/network- ops/connecting.html 码:

    • 我正在编写一个方法,如果我想将一个类传递给一个方法,其中代码的一部分包括检查对象是否属于某种类型。这就是我想要的(但显然不行): 关于如何做到这一点有什么提示吗?谢谢!

    • 问题内容: 在Java中,如何将一个函数作为另一个函数的参数传递? 问题答案: Java 8及以上 如果你的类或接口只有一个抽象方法(有时称为SAM type),则使用Java 8+ lambda表达式,例如: 然后可以在使用MyInterface的任何地方替换lambda表达式: 例如,你可以非常快速地创建一个新线程: 并使用方法引用语法使其更加清晰: 如果没有 lambda表达式,则最后两个示

    • 问题内容: 我正在创建一种通过传递搜索字段从任何表中选择ID的方法。 但是我得到一个有关语法错误的MySqlException。当我查看“异常”消息时,它向我显示带引号的查询表!如何将表格作为不带引号的参数传递? 问题答案: 大多数数据库不允许您通过参数指定表名或列名。参数用于 值 。如果确实确实需要使它动态化,则应验证输入(它应该是一个已知的表名,并且该表中具有已知的列名),然后将其包括在SQL