当前位置: 首页 > 知识库问答 >
问题:

如何将整行传递给UDF-Spark DataFrame过滤器

柳胡媚
2023-03-14

我正在为具有大量内部结构的复杂JSON数据集编写过滤函数。传递单个列太麻烦了。

因此,我声明了以下UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

直觉上,我认为它会这样工作:

records.filter("myFilter(*)=true")

实际的语法是什么?

共有3个答案

李俊雅
2023-03-14

除了第一个答案。当我们希望将所有列传递给UDF时,我们可以使用

 struct("*")
祁永嘉
2023-03-14
scala> inputDF
res40: org.apache.spark.sql.DataFrame = [email: string, first_name: string ... 3 more fields]

scala> inputDF.printSchema
root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)

现在,我想根据性别字段过滤行。我可以通过使用. filter($"性别" === "男性")来实现这一点,但我想使用. filter(函数)

因此,定义了我的匿名函数

val isMaleRow = (r:Row) => {r.getAs("gender") == "Male"}

val isFemaleRow = (r:Row) => { r.getAs("gender") == "Female" }

inputDF.filter(isMaleRow).show()

inputDF.filter(isFemaleRow).show()

我觉得这个需求可以用更好的方式完成,即不用声明为UDF并调用它。

暨弘毅
2023-03-14

在调用函数时,必须使用<code>struct()

导入行,

import org.apache.spark.sql._

定义 UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

注册UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)

创建数据帧

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

使用UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show

当u希望将所有列传递给UDF时。

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 

结果:

+------+------+
|  text| text2|
+------+------+
|sachin|sachin|
+------+------+
 类似资料:
  • 问题内容: 这是我的config.json: 这是我的bash命令: 输出: 因此$ PRJNAME是prj1,但是第一次调用仅输出。 有人能帮我吗? 问题答案: 您的示例中的jq程序实际上会尝试查找名为的键。请尝试以下操作:

  • 问题内容: 是否可以将参数传递给filter函数,以便您可以按任何名称进行过滤? 就像是 问题答案: 实际上,还有另一种(也许是更好的解决方案),您可以使用angular的本机“过滤器”过滤器,并且仍将参数传递给自定义过滤器。 考虑以下代码: 要进行此工作,您只需将过滤器定义如下: 如您在这里看到的,weDontLike实际上返回另一个函数,该函数的范围内有您的参数以及来自过滤器的原始项。 我花了

  • //有什么方法可以在我的自定义过滤器类中获得“admin,XY8382,basic” 我的筛选器类

  • 我在Scala的Spark数据框架中有一列,它是使用 我想将此列传递给UDF,以便进一步处理,以处理此聚合列中的一个索引。 当我将参数传递给我的UDF时: UDF-类型为Seq[Row]:val removeUnstableActivations:UserDefinedFunction=UDF((xyz:java.util.Date,def:Seq[Row]) 我收到错误: 我应该如何传递这些列,

  • 因此,在这里我将basePackages硬编码为“com.example”。这不是我想要的。我想通过命令行参数从终端运行应用程序时接收包名。有没有一种方法可以将命令行参数传递给应用程序,并使用我在BasePackages中接收到的参数?提前致谢

  • 问题内容: 我需要在页面的几个地方使用指令,它有时应包含完整列表,但有时应进行过滤。这是我的幼稚方法: HTML: Javascript: http://jsfiddle.net/GDfxd/14/ 当我尝试使用过滤器时,出现此错误: 有解决此问题的方法吗? 问题答案: 在 $消化迭代 错误通常发生在有改变模型中的守望者。在错误情况下,隔离绑定将绑定到过滤器的结果。该绑定创建了一个观察者。由于过滤