我试图以CSV格式保存py spark . SQL . data frame . data frame(也可以是其他格式,只要它易于阅读)。
到目前为止,我找到了几个示例来保存DataFrame。然而,每次我编写它时,它都会丢失信息。
数据集示例:
# Create an example Pyspark DataFrame
from pyspark.sql import Row
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('A', 'AA', 'mail1', 100000)
employee2 = Employee('B', 'BB', 'mail2', 120000 )
employee3 = Employee('C', None, 'mail3', 140000 )
employee4 = Employee('D', 'DD', 'mail4', 160000 )
employee5 = Employee('E', 'EE', 'mail5', 160000 )
department1 = Row(id='123', name='HR')
department2 = Row(id='456', name='OPS')
department3 = Row(id='789', name='FN')
department4 = Row(id='101112', name='DEV')
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2, employee5])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee3])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])
departmentsWithEmployees_Seq = [departmentWithEmployees1, departmentWithEmployees2]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)
为了将这个文件保存为CSV,我首先尝试了这个解决方案:
type(dframe)
Out[]: pyspark.sql.dataframe.DataFrame
dframe.write.csv('junk_mycsv.csv')
不幸是,这导致了以下错误:
org.apache.spark.sql.AnalysisException: CSV data source does not support struct<id:string,name:string> data type.;
这就是我尝试另一种可能性的原因,将spark数据帧转换成pandas数据帧,然后保存它。如本例所述。
pandas_df = dframe.toPandas()
效果很好!但是,如果我显示我的数据,则缺少数据:
print(pandas_df.head())
department employees
0 (123, HR) [(A, AA, mail1, 100000), (B, BB, mail2, 120000...
1 (456, OPS) [(C, None, mail3, 140000), (D, DD, mail4, 1600...
正如您在下面的快照中看到的,我们缺少信息。因为数据应该是这样的:
department employees
0 id:123, name:HR firstName: A, lastName: AA, email: mail1, salary: 100000
# Info is missing like 'id', 'name', 'firstName', 'lastName', 'email' etc.
# For the complete expected example, see screenshow below.
仅供参考:我在使用Python的Databricks工作。
因此,如何在不丢失信息的情况下写入我的数据(上例中的dframe)?
非常感谢提前!
编辑为Pault添加图片,以显示csv(和标题)的格式。
编辑2替换图片,例如csv输出:
运行Paault的代码后:
from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
.repartition(1).write.csv("junk_mycsv.csv", header= True)
输出不整洁,因为大多数列标题都是空的(由于嵌套格式?)。仅复制第一行:
department employees (empty ColName) (empty ColName) (and so on)
{\id\":\"123\" \"name\":\"HR\"}" [{\firstName\":\"A\" \"lastName\":\"AA\" (...)
您的数据框架具有以下模式:
dframe.printSchema()
#root
# |-- department: struct (nullable = true)
# | |-- id: string (nullable = true)
# | |-- name: string (nullable = true)
# |-- employees: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- firstName: string (nullable = true)
# | | |-- lastName: string (nullable = true)
# | | |-- email: string (nullable = true)
# | | |-- salary: long (nullable = true)
因此,部门
列是具有两个命名字段的 StructType
,员工
列是具有四个命名字段的结构数组。看起来您想要的是以一种保存每条记录的键
和值
的格式写入数据。
一种选择是以JSON格式而不是CSV格式写入文件:
dframe.write.json("junk.json")
这将产生以下输出:
{"department":{"id":"123","name":"HR"},"employees":[{"firstName":"A","lastName":"AA","email":"mail1","salary":100000},{"firstName":"B","lastName":"BB","email":"mail2","salary":120000},{"firstName":"E","lastName":"EE","email":"mail5","salary":160000}]}
{"department":{"id":"456","name":"OPS"},"employees":[{"firstName":"C","email":"mail3","salary":140000},{"firstName":"D","lastName":"DD","email":"mail4","salary":160000}]}
或者,如果您想保持CSV格式,可以在编写CSV之前使用to_json
将每一列转换为json。
# looping over all columns
# but you can also just limit this to the columns you want to convert
from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
.write.csv("junk_mycsv.csv")
这会产生以下输出:
"{\"id\":\"123\",\"name\":\"HR\"}","[{\"firstName\":\"A\",\"lastName\":\"AA\",\"email\":\"mail1\",\"salary\":100000},{\"firstName\":\"B\",\"lastName\":\"BB\",\"email\":\"mail2\",\"salary\":120000},{\"firstName\":\"E\",\"lastName\":\"EE\",\"email\":\"mail5\",\"salary\":160000}]"
"{\"id\":\"456\",\"name\":\"OPS\"}","[{\"firstName\":\"C\",\"email\":\"mail3\",\"salary\":140000},{\"firstName\":\"D\",\"lastName\":\"DD\",\"email\":\"mail4\",\"salary\":160000}]"
请注意,双引号是转义的。
问题内容: 以下代码存在问题: 是,是。在不丢失信息的情况下,在这两种数字类型之间进行转换以使用编码器和解码器的唯一方法是吗? 注意,在您的典型情况下,我不尝试直接进行数值转换。我更关心维护随机数生成器的统计属性。 问题答案: 看到-1与以32位运行的进程一致。 例如,请参阅Go1.1发行说明(已引入) 使用可以帮助您查看发生了什么 事实证明,OP实战程序([在注释中]确认它最初以32位运行(因此
我有一个. csv文件编码在UCS-2LE BOM。我需要对它进行一些更改,我想使用preg_replace,所以我想将文件转换为UTF-8。然而,当我转换它时,所有的空格都消失了,所有属于同一行的单词都粘在一起。 我的代码是: 进行转换的正确方法是什么,这样我就不会丢失任何空格或字符? 转换前-Excel中的屏幕截图: 转换文件后:
问题内容: 我正在使用SQL数据库,我有一列名为“价格”。创建数据库后,将“价格”列设置为“我”,需要将其类型更改为不丢失数据库中的数据。这应该通过SQL脚本来完成 我想到了创建一个新列,将数据移到其中,删除旧列,然后重命名新创建的列。 有人可以帮我举个例子吗?在SQL中也有一个函数可以将字符串解析为十进制? 谢谢 问题答案: 您无需添加新列两次,只需在更新新列后删除旧列即可: 请注意,如果不是数
我编写了一个非常简单的Flink流媒体作业,它使用从Kafka获取数据。 这工作得很好,每当我在Kafka上将某些内容放入主题时,它都会被我的Flink作业接收并处理。现在我试图看看如果我的Flink作业由于某种原因不在线会发生什么。所以我关闭了flink作业并继续向Kafka发送消息。然后我再次开始我的Flink作业,并期望它会处理同时发送的消息。 然而,我得到了以下信息: 因此,它基本上忽略了
我们目前基本上通过以下简化机制确认消息: 基本上,每当我们暂时不能处理消息时(在IOExceptions的情况下),我们希望在以后的时间再次接收它。 但这不起作用,因为acknowledge假设同一分区内以前的所有消息都已成功处理。在我们的IOException案例中,失败的消息将被跳过,但可能会被同一分区上具有更高索引的不同消息确认。 我们对如何解决这个问题有一些想法,但这意味着需要一些棘手的解
我有一个表tablename和列col1-col10。不是每行都填充了col4,但每行都填充了col1、col2、col3。我想在col4满足条件时获取所有{col1,col2,col3}元组,然后从TableName中获取与元组{col1,col2,col3}匹配的所有行。 我怎样才能做到这一点?