我正在使用pyspark,使用spark-
csv将大的csv文件加载到数据帧中,并且作为预处理步骤,我需要对其中一列(包含json字符串)中的可用数据进行多种操作。这将返回X值,每个值都需要存储在自己的单独列中。
该功能将在UDF中实现。但是,我不确定如何从该UDF返回值列表并将其馈送到各个列中。下面是一个简单的示例:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
产生以下内容:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
将udf返回的两个(在此示例中)值存储在单独的列上的最佳方法是什么?现在,它们被键入为字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
无法通过单个UDF调用创建多个顶级列,但可以创建一个新的struct
。它需要具有指定的UDF returnType
:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
schema = StructType([
StructField("foo", FloatType(), False),
StructField("bar", FloatType(), False)
])
def udf_test(n):
return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))
test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])
foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
## |-- foobar: struct (nullable = true)
## | |-- foo: float (nullable = false)
## | |-- bar: float (nullable = false)
您可以使用简单的方法进一步简化架构select
:
foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+
我正在使用pyspark,用spark-csv将一个大型csv文件加载到dataframe中,作为预处理步骤,我需要对其中一列(包含json字符串)中的可用数据应用各种操作。它将返回X个值,每个值都需要存储在它们自己单独的列中。 该功能将在UDF中实现。但是,我不确定如何从该UDF返回一个值列表,并将其输入到各个列中。下面是一个简单的例子: 生成以下内容:
问题内容: SQL Server(2000/2005)函数获取表名和字段名作为参数,并从函数内的动态查询返回结果。结果应分配给变量,该变量将在存储过程中进一步使用。如何实现呢? 我收到错误消息:“只能从函数中执行函数和扩展存储过程。” 问题答案: 我不确定这如何与函数一起使用,但是如果您有一个存储过程返回一个结果集,则可以使用INSERT EXEC语句将其插入到表变量中。 只要字段匹配,那将起作用
问题内容: 我有一个非常大的数据框(大约一百万行),其中包含来自实验的数据(60位受访者)。我想将数据框分成60个数据框(每个参与者一个数据框)。 在数据帧(称为=数据)中,有一个名为“名称”的变量,它是每个参与者的唯一代码。 我已经尝试了以下方法,但是没有任何反应(或者一小时内没有停止)。我打算做的是将数据帧(数据)拆分为较小的数据帧,并将其附加到列表(数据列表)中: 我没有收到错误消息,脚本似
问题内容: 我使用此辅助函数来接收我的请求的JSON结果: 我将它作为Web应用程序中url的一部分提供了一些字符串,例如’/ api / getusers’,因此看起来像。现在我需要包含JSON数据,我从URL接收将被分配给我的变量,所以它看起来像这样的字符串结果:。然后,我将处理此JSON数据。问题在于返回响应变量。它是未定义的。谢谢! 问题答案: 这是一个异步操作,这意味着从服务器返回后很长
问题内容: 我有一个带有多个列以及一个日期列的数据框。日期格式为15年12月31日,我将其设置为日期时间对象。 我将datetime列设置为索引,并希望对数据框的每个月执行回归计算。 我相信实现此目的的方法是将数据框基于月份拆分为多个数据框,存储到数据框列表中,然后对列表中的每个数据框执行回归。 我使用过groupby可以按月成功拆分数据框,但是不确定如何正确地将groupby对象中的每个组转换为
我有一个numpy数组,一个定义数组中范围的开始/结束索引列表,以及一个值列表,其中值的数量与范围的数量相同。在循环中执行此赋值当前非常慢,因此我想以矢量化的方式将值赋给数组中的相应范围。这可能吗? 这是一个具体的简化示例: <代码>a=np。零([10]) 下面是定义a中范围的开始索引和结束索引列表,如下所示: 这是我想分配给每个范围的值列表: <代码>值=[1、2、3、4] 我有两个问题。首先