我有一个用斯卡拉写的UDF,我希望能够通过Pyspark会话调用它。UDF 采用两个参数:字符串列值和第二个字符串参数。我已经能够成功地调用UDF,如果它只需要一个参数(列值)。如果需要多个参数,我很难调用UDF。以下是到目前为止我在斯卡拉和Pyspark中能够做的事情:
Scala UDF:
class SparkUDFTest() extends Serializable {
def stringLength(columnValue: String, columnName: String): Int =
LOG.info("Column name is: " + columnName)
return columnValue.length
}
在Scala中使用它时,我已经能够注册和使用这个UDF:
Scala主类:
val udfInstance = new SparkUDFTest()
val stringLength = spark.sqlContext.udf.register("stringlength", udfInstance.stringLength _)
val newDF = df.withColumn("name", stringLength(col("email"), lit("email")))
以上工作成功。下面是Pyspark的尝试:
def testStringLength(colValue, colName):
package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().stringLength().apply
return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column), colName))
在皮斯帕克中呼叫 UDF:
df.withColumn("email", testStringLength("email", lit("email")))
在Pyspark中执行上述操作并进行一些调整会给我以下错误:
py4j.Py4JException: Method getStringLength([]) does not exist
or
java.lang.ClassCastException: com.test.example.udf.SparkUDFTest$$anonfun$stringLength$1 cannot be cast to scala.Function1
or
TypeError: 'Column' object is not callable
我能够修改UDF以只取一个参数(列值),并且能够成功地调用它并取回一个新的Dataframe。
斯卡拉UDF级
class SparkUDFTest() extends Serializable {
def testStringLength(): UserDefinedFunction = udf(stringLength _)
def stringLength(columnValue: String): Int =
LOG.info("Column name is: " + columnName)
return columnValue.length
}
更新蟒蛇代码:
def testStringLength(colValue, colName):
package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength().apply
return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column)))
以上工作成功。如果UDF需要额外的参数,我仍然很难调用UDF。如何在Pyspark中将第二个参数传递给UDF?
我能够通过使用咖喱来解决这个问题。首先将UDF注册为
def testStringLength(columnName): UserDefinedFunction = udf((colValue: String) => stringLength(colValue, colName)
称为UDF
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength("email").apply
df.withColumn("email", Column(udfInstance(_to_seq(sc, [col("email")], _to_java_column))))
这可以清理一下,但这就是我如何让它工作。
编辑:我使用咖喱的原因是,即使在我想作为字符串传递给UDF的第二个参数中使用“light”时,我也一直在使用“TypeError:'Col列'对象不可调用”错误。在Scala中,我没有遇到这个问题。我不确定为什么会在Pyspark中发生这种情况。这可能是由于Python解释器和Scala代码之间可能发生的一些复杂情况。仍然不清楚,但咖喱对我有用。
我正在编写一个用户定义的函数,它将接受数据帧中除第一列之外的所有列,并进行求和(或任何其他操作)。现在,数据帧有时可以有3列或4列或更多。会有所不同。 我知道我可以在UDF中硬编码4个列名作为传递,但在这种情况下它会有所不同,所以我想知道如何完成它? 这里有两个示例,第一个示例中我们有两列要添加,第二个示例中有三列要添加。
在计算附加信息时发生内部错误。org.eclipse.jdt.internal.core.SearchableEnvironment.(Lorg/eclipse/jdt/内部/核心/JavaProject; Lorg/eclipse/jdt/核心/WorkingCopyOwner;)
我正在尝试将两个PySpark数据帧与仅位于其中一个上的列连接起来: 现在我想生成第三个数据帧。我想要像熊猫这样的东西: 这可能吗?
我正在尝试使用ScalaTest和ScalaCheck进行基于属性的测试。我的测试概述如下: 现在我看到的是,如果我一遍又一遍地运行PropSpec1中的测试,有时第二个测试会通过,但大多数时候会失败。现在,如果0没有被b测试,那么很明显它会通过,但我认为这是它会一直尝试的事情之一。重复运行sbt clean test时,我看到了相同的行为;有时两项测试都通过了。 这对于基于属性的测试是正常的吗,
我的测试代码如下所示: 显然,这是无法编译的,因为需要一个到套接字的路径作为参数,但我不能调用,因为它不是类型,也不能调用,因为mock不将传递给它的类型的参数作为自己的参数。 那么,如何编写一个模拟工厂来传递给我的扫描仪呢?我甚至不知道如何模拟一个需要争论的类!
我想实现以下函数来激发SQL。给定一个数组,返回索引的最大值。我试过: 这很好,但仅适用于-我希望UDF适用于其他数值(例如s)。我尝试了以下方法,但我无法返回带有类型的结构: 有什么想法吗?