当前位置: 首页 > 知识库问答 >
问题:

在pyspark中从数据帧构建StructType

容宏逸
2023-03-14

我是新的spark和python,面临着从元数据文件构建模式的困难,该模式可以应用于我的数据文件。场景:数据文件的元数据文件(csv格式),包含列及其类型:例如:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

我已成功将其转换为如下数据帧:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

但是当我尝试用这个将其转换为StructField格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

然后使用

schemaFinal = StructType(schemaList)

我得到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

一旦我准备好了模式,我想使用createDataFrame来应用于我的数据文件。这个过程必须为许多表完成,所以我不想硬编码类型,而是使用元数据文件来构建模式,然后应用于RDD。

提前感谢。

共有3个答案

厉钊
2023-03-14
val columns: Array[String] = df1.columns
val reorderedColumnNames: Array[String] = df2.columns //or do the reordering you want
val result: DataFrame = dataFrame.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)
庾兴发
2023-03-14

可以遵循以下步骤来更改数据类型对象

data_schema=[
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
]



final_struct=StructType(fields=data_schema)

df=spark.read.json('/home/abcde/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)



df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
郑胡媚
2023-03-14

字段的参数必须是DataType对象的列表。这:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

在收集数据类型(list[list[tuple[DataType]])的数据类型(list[tuple[DataType]])的元组(Rows)的列表(list[tuple[DataType]])的元组(Rows)的列表(list[tuple[DataType]]))的列表()后生成,更不用说可为null的参数应该是布尔值而不是字符串。

您的第二次尝试:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

在收集对象后生成对象的列表。

所显示记录的正确架构应大致如下所示:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

尽管为这样的任务使用分布式数据结构是一种严重的过度杀戮,更不用说低效了,但您可以尝试如下调整您的第一个解决方案

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

但它不是特别安全(eval)。从JSON/dictionary构建模式可能更容易。假设您有一个从类型描述映射到规范类型名称的函数:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

您可以构建以下形状的词典:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

并将其馈送到StructType。fromJson:

StructType.fromJson(schema_dict)

 类似资料:
  • 我想使用PySpark创建spark数据帧,为此我在PyCharm中运行了以下代码: 但是,它会返回此错误: 使用 Spark 的默认 log4j 配置文件:组织/缓存/火花/log4j-defaults.属性 将默认日志级别设置为“WARN”。要调整日志记录级别,请使用 sc.setLogLevel(新级别)。对于 SparkR,请使用 setLogLevel(新级别)。18/01/08 10:

  • 我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确

  • 我是pyspark的新手,我来尝试做一些像下面这样的事情,为每个cookie调用一个函数Print细节,然后将结果写入文件。spark.sql查询返回正确的数据,我也可以将其序列化为文件。有人可以帮助每个cookie上的for语句。调用UDF的语法应该是什么,如何将输出写入文本文件? 任何帮助是值得赞赏的。谢谢

  • 我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 问题内容: 我在Databricks工作。 我有一个包含500行的数据框,我想创建两个包含100行的数据框,另一个包含剩余的400行。 我尝试了以下操作,但收到错误消息 问题答案: 最初,我误会了,并以为您想分割这些列。如果要选择行的子集,一种方法是使用创建索引列。从文档: 保证生成的ID是单调递增且唯一的,但不是连续的。 您可以使用此ID对数据框进行排序,并使用该ID对其子集进行排序,以确保准确