我正在使用数据库。假设我有两个Spark Dataframes(我正在使用PySpark):
如果df_source具有以下模式:
root
|-- name: string (nullable = false)
|-- id: long (nullable = false)
|-- age: long (nullable = true)
df_target具有以下架构:
root
|-- name: string (nullable = true)
|-- id: long (nullable = false)
|-- age: long (nullable = false)
如何有效地创建另一个数据帧,df_final其中可以将df_source中的(null = true/false)属性强制到df_target?
我尝试了以下方法:
df_final = spark.createDataFrame(df_target.rdd, schema = df_source.schema)
通过这种方法,我能够获得所需的结果,但是对于我拥有的数据集大小来说,它似乎需要很长时间。对于较小的数据集,它可以正常工作。对于较大的数据集,使用 collect() 函数而不是 rdd 转换显然更糟。
我想指出的是,我在这里要做的唯一一件事是从源模式中复制可空性部分,并在目标中对其进行相应的更改,以获得最终的数据帧。
是否有一种方法可以执行某种类型的可空性转换,在性能方面类似于.withColumn(),不需要RDD转换,也不需要代码中明确的列名规范?列顺序已在源和目标之间对齐。
附加上下文:我需要这样做的原因是因为我需要使用Spark BQ连接器将df_final写入(追加)到Google BigQuery表。因此,即使我的Spark Dataframe在列中没有任何空值,但null属性设置为true,BigQuery表也会拒绝写入操作,因为BigQuery表中的该列可能将null属性设置为false,并且模式不匹配。
由于您知道age
不能为空,因此可以将合并
age和常量文本以创建不可为空的字段。对于<code>可为空的<code>字段必须从<code>false</code>转换为<code>true</code>的字段,可以使用<code>表达式。
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql import functions as F
df_source_schema = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", LongType(), True),
])
df_target_schema = StructType([
StructField("name", StringType(), True),
StructField("id", LongType(), False),
StructField("age", LongType(), False),
])
df_source = spark.createDataFrame([("a", 1, 18, ), ], df_source_schema)
df_source.printSchema()
"""
root
|-- name: string (nullable = false)
|-- id: long (nullable = false)
|-- age: long (nullable = true)
"""
df_target = spark.createDataFrame([("a", 1, 18), ], df_target_schema)
df_target.printSchema()
"""
root
|-- name: string (nullable = true)
|-- id: long (nullable = false)
|-- age: long (nullable = false)
"""
# Construct selection expression based on the logic described above
target_field_nullable_map = {field.name: field.nullable for field in df_target.schema}
selection_expr = []
for src_field in df_source.schema:
field_name = src_field.name
field_type = src_field.dataType
if target_field_nullable_map[field_name] != src_field.nullable:
if src_field.nullable:
selection_expr.append(F.when(F.col(field_name).isNotNull(), F.col(field_name)).otherwise(F.lit(None)).alias(field_name))
else:
selection_expr.append(F.coalesce(F.col(field_name), F.lit("-1").cast(field_type)).alias(field_name))
else:
selection_expr.append(F.col(field_name))
df_final = df_target.select(*selection_expr)
df_final.printSchema()
"""
root
|-- name: string (nullable = false)
|-- id: long (nullable = false)
|-- age: long (nullable = true)
"""
对于要使合并
表达式为 null,
其子表达式的所有子表达式都必须为 null,从此处可以看出。因为 lit
是非空表达式,当值 != 空
合并
导致不可为空的列时。
当表达式可以为 null 时
,如果 else 表达式可为 null,则任何分支都可以为 null,如此处所述。
有时(例如用于测试和bechmark)我想强制执行在DataFrame上定义的转换。AFAIK调用像这样的操作并不能确保所有都被实际计算,可能只计算所有的子集(参见下面的示例) 我的解决方案是使用df将数据帧写入HDFS。写saveAsTable,但这会将我的系统与我不想再保留的表“混在一起”。 那么,触发数据帧评估的最佳方式是什么呢? 编辑: 请注意,最近还讨论了火花开发者列表:http://a
问题内容: 如何在spark数据帧中强制转换结构数组? 让我通过一个例子来说明我要做什么。我们将从创建一个数据框开始,该数据框包含行和嵌套行的数组。我的整数尚未在数据框中强制转换,它们已创建为字符串: 这是创建的数据框的架构: 我想做的是将所有可以为整数的字符串都转换为整数。我尝试执行以下操作,但没有成功: 我有以下异常: 任何人都有正确的查询将所有值转换为INTEGER吗?我将不胜感激。 非常感
我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde
我正在使用: Python 3.6.8 火花2.4.4 我在spark类路径中有以下JAR: http://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar http://repo1.maven.org/maven2/com/databricks/spark-a
我开始使用Spark DataFrames,我需要能够枢轴的数据,以创建多个列1列多行。在Scalding中有内置的功能,我相信Python中的熊猫,但是我找不到任何新的Spark Dataframe。 我假设我可以编写某种自定义函数来实现这一点,但我甚至不知道如何开始,特别是因为我是Spark的新手。如果有人知道如何使用内置功能或如何在Scala中编写东西的建议来实现这一点,我们将不胜感激。
null 非常感谢任何指向文档或非常基本的示例的指针。