当前位置: 首页 > 知识库问答 >
问题:

在Spark中,如何将多个数据帧转换成一个Avro?

缪坚诚
2023-03-14
{
    "name": "mydata",
    "type": "record",
    "fields": [
        {"name": "data", "type": {
            "type": "array", "items": {
                "name": "actualData", "type": "record", "fields": [
                    {"name": "metadata1", "type": "int"},
                    {"name": "metadata2", "type": "string"},
                    {"name": "dataframe", "type": {
                        "type": "array", "items": {
                            "name": "dataframeRecord", "type": "record", "fields": [
                                {"name": "field1", "type": "int"},
                                {"name": "field2", "type": "int"},
                                {"name": "field3", "type": ["string", "null"]}]
                            }
                        }
                    }]
                }
            }
        }
    ]
}

共有1个答案

罗茂实
2023-03-14

如果模式是相同的,并且您只想将所有记录放入相同的DataFrame中,则可以使用DataFrame unionAll方法。

http://spark.apache.org/docs/1.6.3/api/python/pyspark.sql.html#pyspark.sql.dataframe.unionall

此函数将接受一个数据规则并将其追加到另一个数据规则。问题在于,它假设列之间的顺序相同,所以您可能需要做一些工作来将它们排列起来,并为丢失的列创建空列。下面是我用来安全地联合多个数据帧的python函数

def union_multiple_dataframes(iterable_list_df):
    input_dfs = list(iterable_list_df)

    # First figure out all the field names
    field_types = {}
    for df in input_dfs:
        for field in df.schema.fields:
            # Check for type mismatch
            if field in field_types:
                if field.dataType != field_types[field.name]:
                    raise ValueError("Mismatched data types when unioning dataframes for field: {}".format(field))
            else:
                field_types[field.name] = field.dataType

    # First add in empty fields so all df's have the same schema
    fields = set(field_types.keys())
    for i, df in enumerate(input_dfs):
        missing = fields - set(df.schema.names)
        for field in missing:
            df = df.withColumn(field, F.lit(None))

        input_dfs[i] = df

    # Finally put all the df's columns in the same order, and do the actual union
    sorted_dfs = [df.select(*sorted(fields)) for df in iterable_list_df]
    return reduce(lambda x, y: x.unionAll(y), sorted_dfs)
input_dfs = [do_something(..) for x in y]
combined_df = union_multiple_dataframes(input_dfs)
combined_df.write.format("com.databricks.spark.avro").save("s3://my-bucket/path")
 类似资料:
  • 有人知道怎么做吗?显然有一个Windows命令用于这种事情... 谢谢

  • 我正在实施一个项目,其中MySql数据被导入到hdfs使用sqoop。它有将近30张桌子。我通过推断模式和注册为临时表来读取每个表作为数据帧。我做这件事有几个问题...1.假设df1到df10的表需要实现几个连接。在MySQL中,查询将是而不是使用是否有其他连接所有数据帧有效地基于条件...

  • RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:

  • 我正在尝试将熊猫DF转换为Spark one。测向头: 代码: 我得到了一个错误:

  • 数据帧: 我遵循了一个类似问题的解决方案:如何从for循环中构建和填充熊猫数据帧? 您可以看到这些值都是准确的,但是它返回了每个索引的整个值列表 此输出看起来正确,但正如您所看到的,只返回第一组

  • API需要以下多部分/表单数据格式: 我的模型对象如下所示: 我如何通过使用我的model类发送多部分数据,而不是手动创建字符串并将其作为MultiPartBody.Part对象传递? 还有,有没有可能在这个功能中添加一个图像上传呢?