{
"name": "mydata",
"type": "record",
"fields": [
{"name": "data", "type": {
"type": "array", "items": {
"name": "actualData", "type": "record", "fields": [
{"name": "metadata1", "type": "int"},
{"name": "metadata2", "type": "string"},
{"name": "dataframe", "type": {
"type": "array", "items": {
"name": "dataframeRecord", "type": "record", "fields": [
{"name": "field1", "type": "int"},
{"name": "field2", "type": "int"},
{"name": "field3", "type": ["string", "null"]}]
}
}
}]
}
}
}
]
}
如果模式是相同的,并且您只想将所有记录放入相同的DataFrame中,则可以使用DataFrame unionAll方法。
http://spark.apache.org/docs/1.6.3/api/python/pyspark.sql.html#pyspark.sql.dataframe.unionall
此函数将接受一个数据规则并将其追加到另一个数据规则。问题在于,它假设列之间的顺序相同,所以您可能需要做一些工作来将它们排列起来,并为丢失的列创建空列。下面是我用来安全地联合多个数据帧的python函数
def union_multiple_dataframes(iterable_list_df):
input_dfs = list(iterable_list_df)
# First figure out all the field names
field_types = {}
for df in input_dfs:
for field in df.schema.fields:
# Check for type mismatch
if field in field_types:
if field.dataType != field_types[field.name]:
raise ValueError("Mismatched data types when unioning dataframes for field: {}".format(field))
else:
field_types[field.name] = field.dataType
# First add in empty fields so all df's have the same schema
fields = set(field_types.keys())
for i, df in enumerate(input_dfs):
missing = fields - set(df.schema.names)
for field in missing:
df = df.withColumn(field, F.lit(None))
input_dfs[i] = df
# Finally put all the df's columns in the same order, and do the actual union
sorted_dfs = [df.select(*sorted(fields)) for df in iterable_list_df]
return reduce(lambda x, y: x.unionAll(y), sorted_dfs)
input_dfs = [do_something(..) for x in y]
combined_df = union_multiple_dataframes(input_dfs)
combined_df.write.format("com.databricks.spark.avro").save("s3://my-bucket/path")
有人知道怎么做吗?显然有一个Windows命令用于这种事情... 谢谢
我正在实施一个项目,其中MySql数据被导入到hdfs使用sqoop。它有将近30张桌子。我通过推断模式和注册为临时表来读取每个表作为数据帧。我做这件事有几个问题...1.假设df1到df10的表需要实现几个连接。在MySQL中,查询将是而不是使用是否有其他连接所有数据帧有效地基于条件...
RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:
我正在尝试将熊猫DF转换为Spark one。测向头: 代码: 我得到了一个错误:
数据帧: 我遵循了一个类似问题的解决方案:如何从for循环中构建和填充熊猫数据帧? 您可以看到这些值都是准确的,但是它返回了每个索引的整个值列表 此输出看起来正确,但正如您所看到的,只返回第一组
API需要以下多部分/表单数据格式: 我的模型对象如下所示: 我如何通过使用我的model类发送多部分数据,而不是手动创建字符串并将其作为MultiPartBody.Part对象传递? 还有,有没有可能在这个功能中添加一个图像上传呢?