当前位置: 首页 > 知识库问答 >
问题:

在scala中传递两列给udf?

宗政财
2023-03-14

我有一个包含两列的数据帧,一列是数据,另一列是该数据字段中的字符计数。

Data    Count
Hello   5
How     3
World   5

我想根据count列中的值更改列数据的值。如何实现这一点?我尝试使用一个udf:

invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("value"),invalidrecords("a_cnt")))

这似乎是失败的,这是正确的做法吗?

共有1个答案

燕砚文
2023-03-14

这里有一个简单的方法

首先,您创建一个数据帧

import sqlContext.implicits._
val invalidrecords = Seq(
  ("Hello", 5),
  ("How", 3),
  ("World", 5)
).toDF("Data", "Count")

你应该有

+-----+-----+
|Data |Count|
+-----+-----+
|Hello|5    |
|How  |3    |
|World|5    |
+-----+-----+

然后,将udf函数定义

import org.apache.spark.sql.functions._
def appendDelimiterError = udf((data: String, count: Int) => "value with error" )

并且您使用< code>withColumn作为

invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)

您应该将输出为

+-----+-----+----------------+
|Data |Count|value           |
+-----+-----+----------------+
|Hello|5    |value with error|
|How  |3    |value with error|
|World|5    |value with error|
+-----+-----+----------------+

您可以编写自己的逻辑,而不是从< code>udf函数返回一个字符串

编辑

在下面的评论中回答您的要求将需要您更改udf函数并使用列,如下所示

def appendDelimiterError = udf((data: String, count: Int) => {
  if(count < 5) s"convert value to ${data} - error"
  else data
} )

invalidrecords.withColumn("Data",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)

你应该有输出作为

+----------------------------+-----+
|Data                        |Count|
+----------------------------+-----+
|Hello                       |5    |
|convert value to How - error|3    |
|World                       |5    |
+----------------------------+-----+
 类似资料:
  • 我在Scala的Spark数据框架中有一列,它是使用 我想将此列传递给UDF,以便进一步处理,以处理此聚合列中的一个索引。 当我将参数传递给我的UDF时: UDF-类型为Seq[Row]:val removeUnstableActivations:UserDefinedFunction=UDF((xyz:java.util.Date,def:Seq[Row]) 我收到错误: 我应该如何传递这些列,

  • 我刚到UDF的斯帕克。我也看过这里的回答 问题陈述:我正在尝试从数据帧列中查找模式匹配。 例如:数据帧 现在我想对列$text中的每一行进行模式匹配,并添加一个名为count的新列。 结果: 我试图定义一个udf,将$text列作为数组[Seq[String]传递。但是我不能得到我想要的。 到目前为止,我尝试过: 任何帮助都将不胜感激

  • 我很好奇在Spark中把一个RDD传递给一个函数到底做了什么。 假设我们如上定义一个函数。当我们调用函数并传递一个现有的RDD[String]对象作为输入参数时,这个my_function是否将这个RDD作为函数参数进行“复制”?换句话说,是按引用调用还是按值调用?

  • 我有一个spark scala udf,它将一个参数作为dataframe的列,其他参数作为列表,但当我运行函数时,它抛出错误,指向列表参数为 请引导

  • 我有一个scala类a,其中有一个方法。 还有一个B类

  • 例如,我的mapper.xml文件中有以下xml片段: 如您所见,with订阅只有一列 我想传递2列给它,因此得到的代码,我们怎么做?