当前位置: 首页 > 知识库问答 >
问题:

如何使用AWS胶水/火花将S3中的CSVs分区和分割转换为分区和分割拼花地板

淳于健
2023-03-14
s3://my-data-lake/test-table/
    2017/01/01/
        part-0000-blah.csv.gz
        .
        .
        part-8000-blah.csv.gz
    2017/01/02/
        part-0000-blah.csv.gz
        .
        .
        part-7666-blah.csv.gz

我如何使用胶水/火花转换成拼花,这也是分区的日期和分裂在n个文件每天?。这些示例不包括分区、拆分或供应(多少节点和多大节点)。每天包含几百GBS。

因为源CSV不一定在正确的分区中(错误的日期),并且大小不一致,所以我希望用正确的分区和更一致的大小写到分区的parquet。

共有1个答案

蔺劲
2023-03-14

由于源CSV文件的日期不一定正确,您可以向它们添加关于收集日期时间的附加信息(或者使用任何已经可用的日期):

{"collectDateTime": {
    "timestamp": 1518091828,
    "timestampMs": 1518091828116,
    "day": 8,
    "month": 2,
    "year": 2018
}}

然后,您的作业可以在输出DynamicFrame中使用这些信息,并最终将它们用作分区。如何实现这一点的一些示例代码:

from awsglue.transforms import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions

import sys
import datetime

###
# CREATE THE NEW SIMPLIFIED LINE
##
def create_simplified_line(event_dict):

    # collect date time
    collect_date_time_dict = event_dict["collectDateTime"]

    new_line = {
        # TODO: COPY YOUR DATA HERE
        "myData": event_dict["myData"],
        "someOtherData": event_dict["someOtherData"],
        "timestamp": collect_date_time_dict["timestamp"],
        "timestampmilliseconds": long(collect_date_time_dict["timestamp"]) * 1000,
        "year": collect_date_time_dict["year"],
        "month": collect_date_time_dict["month"],
        "day": collect_date_time_dict["day"]
    }

    return new_line


###
# MAIN FUNCTION
##

# context
glueContext = GlueContext(SparkContext.getOrCreate())

# fetch from previous day source bucket
previous_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)

# build s3 paths
s3_path = "s3://source-bucket/path/year={}/month={}/day={}/".format(previous_date.year, previous_date.month, previous_date.day)

# create dynamic_frame
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path]}, format="json", format_options={}, transformation_ctx="dynamic_frame")

# resolve choices (optional)
dynamic_frame_resolved = ResolveChoice.apply(frame=dynamic_frame,choice="project:double",transformation_ctx="dynamic_frame_resolved")

# transform the source dynamic frame into a simplified version
result_frame = Map.apply(frame=dynamic_frame_resolved, f=create_simplified_line)

# write to simple storage service in parquet format
glueContext.write_dynamic_frame.from_options(frame=result_frame, connection_type="s3", connection_options={"path":"s3://target-bucket/path/","partitionKeys":["year", "month", "day"]}, format="parquet")

没有测试它,但脚本只是如何实现这一点的一个样本,而且相当简单。

根据“partition_col”转换为dataframe和分区

partitioned_dataframe=datasource0.todf().repartitioned(1)

转换回DynamicFrame以进行进一步处理。

dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, format="json", format_options={}, transformation_ctx="dynamic_frame")

值得一提的是,这与coalesce不同,但如果您的目标是减少每个分区的文件数量,它可能会有所帮助。

2)如果目标中已经存在文件,它是否会安全地添加它(而不是覆盖或删除)

Glue对write_dynamic_frame.from_options的默认SaveMode是追加。

 类似资料:
  • 我们有一个以红移方式处理数据的用例。但我想在S3中创建这些表的备份,以便使用Spectrum查询这些表。 为了将表从Redshift移动到S3,我使用了一个胶水ETL。我已经为AWS红移创建了一个爬虫程序。胶水作业将数据转换为拼花地板,并将其存储在S3中,按日期进行分区。然后,另一个爬虫会对S3文件进行爬行,以再次对数据进行编目。 如何消除第二个爬虫并在作业本身中执行此操作?

  • 谁能给我解释一下吗? 然而,另一方面是,对于不能保证产生已知分区的转换,输出RDD将没有分区器集。例如,如果对哈希分区的键/值对RDD调用map(),则传递给map()的函数在理论上可以更改每个元素的键,因此结果将不会有分区器。Spark不会分析函数以检查它们是否保留密钥。相反,它提供了另外两个操作,mapValues()和flatMap Values(),它们保证每个元组的键保持不变。 Mate

  • 关于雪花的新功能--推断模式表函数,我有一个问题。INFER模式函数在parquet文件上执行得很好,并返回正确的数据类型。但是,当parquet文件被分区并存储在S3中时,INFER模式的功能与pyspark Dataframes不同。 在DataFrames中,分区文件夹名称和值作为最后一列读取;在雪花推断模式中有没有一种方法可以达到同样的结果? 示例: 示例:{“AGMT_GID”:1714

  • 目前,我在S3中有数千个无头、管道分隔的GZIP压缩文件,总计约10TB,具有相同的模式。在AWS Glue中,(1)添加头文件,(2)使用文件中的“日期”字段转换为按周划分的拼花格式,(3)将文件添加到Glue数据目录中,以便在AWS Athena中进行查询?

  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 所以,我想对我的spark数据帧执行某些操作,将它们写入DB,并在最后创建另一个数据帧。看起来是这样的: 这给我一个错误,因为map分区期望返回的类型,但这里是。我知道这在forEach分区中是可能的,但我也想做映射。单独做会有开销(额外的火花工作)。该怎么办? 谢谢