我是新的spark和python,面临着从元数据文件构建模式的困难,该模式可以应用于我的数据文件。场景:数据文件的元数据文件(csv格式),包含列及其类型:例如:
id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0
我已成功将其转换为如下数据帧:
+--------------------+---------------+
| name| type|
+--------------------+---------------+
| id| IntegerType()|
| created_at|TimestampType()|
| updated_at| StringType()|
但是当我尝试用这个将其转换为StructField格式时
fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))
或
schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()
然后使用
schemaFinal = StructType(schemaList)
我得到以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType
一旦我准备好了模式,我想使用createDataFrame来应用于我的数据文件。这个过程必须为许多表完成,所以我不想硬编码类型,而是使用元数据文件来构建模式,然后应用于RDD。
提前感谢。
val columns: Array[String] = df1.columns
val reorderedColumnNames: Array[String] = df2.columns //or do the reordering you want
val result: DataFrame = dataFrame.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)
可以遵循以下步骤来更改数据类型对象
data_schema=[
StructField("age", IntegerType(), True),
StructField("name", StringType(), True)
]
final_struct=StructType(fields=data_schema)
df=spark.read.json('/home/abcde/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)
df.printSchema()
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
字段的参数必须是DataType
对象的列表。这:
.map(lambda l:([StructField(l.name, l.type, 'true')]))
在收集数据类型(list[list[tuple[DataType]])的数据类型(
list[tuple[DataType]])的元组(
Rows
)的列表(list[tuple[DataType]])的元组(
Rows
)的列表(list[tuple[DataType]]))的列表(
)后生成,更不用说可为null的参数应该是布尔值而不是字符串。
您的第二次尝试:
.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).
在收集对象后生成对象的列表。
所显示记录的正确架构应大致如下所示:
from pyspark.sql.types import *
StructType([
StructField("id", IntegerType(), True),
StructField("created_at", TimestampType(), True),
StructField("updated_at", StringType(), True)
])
尽管为这样的任务使用分布式数据结构是一种严重的过度杀戮,更不用说低效了,但您可以尝试如下调整您的第一个解决方案:
StructType([
StructField(name, eval(type), True) for (name, type) in df.rdd.collect()
])
但它不是特别安全(
eval)。从JSON/dictionary构建模式可能更容易。假设您有一个从类型描述映射到规范类型名称的函数:
def get_type_name(s: str) -> str:
"""
>>> get_type_name("int")
'integer'
"""
_map = {
'int': IntegerType().typeName(),
'timestamp': TimestampType().typeName(),
# ...
}
return _map.get(s, StringType().typeName())
您可以构建以下形状的词典:
schema_dict = {'fields': [
{'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}
并将其馈送到StructType。fromJson:
StructType.fromJson(schema_dict)
我想使用PySpark创建spark数据帧,为此我在PyCharm中运行了以下代码: 但是,它会返回此错误: 使用 Spark 的默认 log4j 配置文件:组织/缓存/火花/log4j-defaults.属性 将默认日志级别设置为“WARN”。要调整日志记录级别,请使用 sc.setLogLevel(新级别)。对于 SparkR,请使用 setLogLevel(新级别)。18/01/08 10:
我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确
我是pyspark的新手,我来尝试做一些像下面这样的事情,为每个cookie调用一个函数Print细节,然后将结果写入文件。spark.sql查询返回正确的数据,我也可以将其序列化为文件。有人可以帮助每个cookie上的for语句。调用UDF的语法应该是什么,如何将输出写入文本文件? 任何帮助是值得赞赏的。谢谢
我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
问题内容: 我在Databricks工作。 我有一个包含500行的数据框,我想创建两个包含100行的数据框,另一个包含剩余的400行。 我尝试了以下操作,但收到错误消息 问题答案: 最初,我误会了,并以为您想分割这些列。如果要选择行的子集,一种方法是使用创建索引列。从文档: 保证生成的ID是单调递增且唯一的,但不是连续的。 您可以使用此ID对数据框进行排序,并使用该ID对其子集进行排序,以确保准确