当前位置: 首页 > 知识库问答 >
问题:

将每个Spark RDD条目分别保存到S3

田马鲁
2023-03-14

例如,我们假设RDDresultrdd为:

[('cats',cats_data),('dogs',dogs_data),('flamingos',flamingos_data)]

当调用resultrdd.saveastextfile(S3n://path/to/soughere/)时,它将创建多个文件,如下所示:

1. 000_part00 [containing cats_data & dogs_data]
2. 000_part01 [containing only flamingos_data]
1. cats [containing only cats_data]
2. dogs [containing only dogs_data]
2. flamingos [containing only flamingos_data]

我认为可以使用BotoS3Connection手动写入S3,如下所示:

s3_connection = <connecting to S3 here>
bucket = s3_connection.get_bucket('animals_data')

def persist_to_s3(data_tuple):
   s3_key = bucket.create_key(key=data_tuple[0], bucket=bucket)
   s3_key.set_contents_from_string(data_tuple[1])

resultRDD.map(persist_to_s3)

不幸的是,connection和bucket对象既不是可序列化的,也不是线程安全的(我推测),所以我不能像上面那样共享节点之间的连接。

我想我可以连接到S3&在persist_to_s3函数本身中获取bucket,但是这个操作肯定会使AWS限制我的API使用,因为我有大量的RDD。

  • 为了以我所描述的格式将数据快速持久化到S3中,我还能做什么?
  • 如果使用repartition()/coalesce()是唯一正确的方法,那么哪一种更适合这种用法?

共有1个答案

乔凯康
2023-03-14

我想到的另一个选择是使用repartition()/coalesce()来减少分区的数量,然后用mapPartitions()来预制上面的操作,这会起作用,但会慢得多。

Repartition&MapPartitions是相对较快的选项,但您提到过它很慢。我认为您可以考虑更传统的解决方案,如多线程读写器

1.使用您描述的格式写入结果数据;
2.使用多线程读/写器模型并行地将数据写入S3存储。类似于“并行读取器--读到-->并发阻塞队列-->并行写入器--写到---->S3”的工作流。

 类似资料:
  • 保存/记录在AWS SNS主题上发布的每条消息的最简单方法是什么?我想可能有一个神奇的设置可以自动将它们推送到S3或数据库,或者可能是一个自动支持HTTP目标的数据库服务,但似乎并非如此。也许需要通过Lambda函数来完成? 目的只是为了在设置一些SNS发布时进行基本的诊断和调试。我并不真正关心大规模或快速查询,只想一次记录和执行几分钟对所有活动的基本查询。

  • 我需要置换一个数组并将每个排列保存在arrayList中,我使用递归方法,但它只重复保存一个结果。

  • 问题内容: 当前不支持在file.contents为流时向多个目标发送消息。解决此问题的方法是什么? 问题答案: 当前,在将file.contents用作流时,每个目标必须使用两个流。将来可能会解决此问题。 编辑:此错误现已在gulp中修复。您原始帖子中的代码应该可以正常工作。

  • 问题内容: 我有一个熊猫DataFrame,我想上传到新的CSV文件。问题是在将文件传输到s3之前,我不想在本地保存文件。是否有像to_csv这样的方法可以将数据帧直接写入s3?我正在使用boto3。 这是我到目前为止的内容: 问题答案: 您可以使用:

  • 假设我有这样一个模型结构: 考虑到这一点,现在我想为PizzaOrder创建一个新条目--但我不想要奶酪、沙司或披萨的任何副本--只是PizzaOrder来表示刚刚订购的披萨。