当前位置: 首页 > 知识库问答 >
问题:

火花SQL:如何使用JAVA从DataFrame操作调用UDF

糜正业
2023-03-14

我想知道如何使用JAVA从SparkSQL中的领域特定语言(DSL)函数调用UDF函数。

我有UDF函数(仅举例):

UDF2 equals = new UDF2<String, String, Boolean>() {
   @Override
   public Boolean call(String first, String second) throws Exception {
       return first.equals(second);
   }
};

我已经注册到sqlContext了

sqlContext.udf().register("equals", equals, DataTypes.BooleanType);

当我运行下面的查询时,我的UDF被调用,我得到一个结果。

sqlContext.sql("SELECT p0.value FROM values p0 WHERE equals(p0.value, 'someString')");

我将使用Spark SQL中特定于域的语言的函数转换此查询,但我不确定如何进行转换。

valuesDF.select("value").where(???);

我发现存在调用 UDF() 函数,其中其参数之一是函数 fnctn 而不是 UDF2。如何使用 UDF 和 DSL 中的函数?

共有3个答案

单嘉泽
2023-03-14

这是工作代码示例。它适用于Spark 1.5. x和1.6. x。从管道变压器中调用UDF的技巧是使用DataFrame上的sqlContext()来注册您的UDF

@Test
public void test() {
    // https://issues.apache.org/jira/browse/SPARK-12484
    logger.info("BEGIN");

    DataFrame df = createData();        
    final String tableName = "myTable";
    sqlContext.registerDataFrameAsTable(df, tableName);

    logger.info("print schema");
    df.printSchema();
    logger.info("original data before we applied UDF");
    df.show();

    MyUDF udf = new MyUDF();
    final String udfName = "myUDF";
    sqlContext.udf().register(udfName, udf, DataTypes.StringType);

    String fmt = "SELECT *, %s(%s) as transformedByUDF FROM %s";
    String stmt = String.format(fmt, udfName, tableName+".labelStr", tableName); 
    logger.info("AEDWIP stmt:{}", stmt);
    DataFrame udfDF = sqlContext.sql(stmt);
    Row[] results = udfDF.head(3);
    for (Row row : results) {
        logger.info("row returned by applying UDF {}", row);
    }

    logger.info("AEDWIP udfDF schema");
    udfDF.printSchema();
    logger.info("AEDWIP udfDF data");
    udfDF.show();


    logger.info("END");
}

DataFrame createData() {
    Features f1 = new Features(1, category1);
    Features f2 = new Features(2, category2);
    ArrayList<Features> data = new ArrayList<Features>(2);
    data.add(f1);
    data.add(f2);
    //JavaRDD<Features> rdd = javaSparkContext.parallelize(Arrays.asList(f1, f2));
    JavaRDD<Features> rdd = javaSparkContext.parallelize(data);
    DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
    return df;
}

class MyUDF implements UDF1<String, String> {
    private static final long serialVersionUID = 1L;

    @Override
    public String call(String s) throws Exception {
        logger.info("AEDWIP s:{}", s);
        String ret = s.equalsIgnoreCase(category1) ?  category1 : category3;
        return ret;
    }
}

public class Features implements Serializable{
    private static final long serialVersionUID = 1L;
    int id;
    String labelStr;

    Features(int id, String l) {
        this.id = id;
        this.labelStr = l;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getLabelStr() {
        return labelStr;
    }

    public void setLabelStr(String labelStr) {
        this.labelStr = labelStr;
    }
}

this is the output

+---+--------+
| id|labelStr|
+---+--------+
|  1|   noise|
|  2|     ack|
+---+--------+

root
 |-- id: integer (nullable = false)
 |-- labelStr: string (nullable = true)
 |-- transformedByUDF: string (nullable = true)

+---+--------+----------------+
| id|labelStr|transformedByUDF|
+---+--------+----------------+
|  1|   noise|           noise|
|  2|     ack|          signal|
+---+--------+----------------+
柳才良
2023-03-14

当查询数据帧时,您应该能够使用如下代码执行UDF:

sourceDf.filter(equals(col("columnName"), "someString")).select("columnName")

其中 col(“列名”)是要比较的列。

岳池暝
2023-03-14

我找到了一个我一半满意的解决方案。可以将UDF作为列条件调用,例如:

valuesDF.filter("equals(columnName, 'someString')").select("columnName");

但是我仍然想知道是否可以直接调用UDF。

编辑:

顺便说一下,可以直接调用udf,例如:

df.where(callUdf("equals", scala.collection.JavaConversions.asScalaBuffer(
                        Arrays.asList(col("columnName"), col("otherColumnName"))
                    ).seq())).select("columnName");

导入组织。火花。sql.函数是必需的。

 类似资料:
  • 类似的问题,但没有足够的观点来评论。 根据最新的Spark文档,< code>udf有两种不同的用法,一种用于SQL,另一种用于DataFrame。我找到了许多关于如何在sql中使用< code>udf的例子,但是还没有找到任何关于如何在数据帧中直接使用< code>udf的例子。 o.p.针对上述问题提供的解决方案使用,这是,将根据Spark Java API文档在Spark 2.0中删除。在那

  • 问题内容: 与此处类似的问题,但在此处没有足够的评论要点。 根据最新的Spark 文档,可以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何与sql 一起使用的示例,但还没有找到有关如何直接在DataFrame上使用a的任何示例。 op所提供的解决方案,在上面链接的问题上使用,根据Spark Java API文档,该解决方案将在Spark 2.0中删除。在那里,它

  • 我正在尝试从这个Scala代码写入csv文件。我使用HDFS作为临时目录,然后writer.write在现有子文件夹中创建一个新文件。我收到以下错误消息: java.io./tfsdl-ghd-wb/raidnd/Incte_19 如果我选择新建文件或退出文件,也会发生同样的情况,我已经检查了路径是否正确,只想在其中创建一个新文件。 问题是,为了使用基于文件系统的源写入数据,您需要一个临时目录,这

  • 我需要从JavaScript调用操作过程。我的操作接受2个输入参数和1个输出参数。下面是我的行动截图 有时还说 请求头字段access-control-allog-headers不允许access-control-allog-headers

  • 当我使用spark-submit with master yarn和deploy-mode cluster提交spark作业时,它不会打印/返回任何applicationId,一旦作业完成,我必须手动检查MapReduce jobHistory或spark HistoryServer来获取作业细节。 我的集群被许多用户使用,在jobHistory/HistoryServer中找到我的作业需要很多时

  • 有一个spark_df有许多重复如下: 现在我想将这个spark_df转换如下: 我在熊猫身上知道这一点。但是我正在努力学习火花,这样我就可以把它实施到大数据中。如果有人能帮忙,那就太好了。