当前位置: 首页 > 知识库问答 >
问题:

Spark-从S3读取分区数据-分区是如何发生的?

胡劲
2023-03-14

当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)-
逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器?

此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?

共有1个答案

萧浩漫
2023-03-14

S3上的数据显然是HDFS外部的。

您可以通过提供一个或多个路径,或使用Hive Metastore从S3读取数据——如果您已经通过为外部S3表创建DDL,并为分区使用MSCK,或为EMR上的Hive更改table_name RECOVER PARTITIONS更新了这一点。

如果您使用:

val df = spark.read.parquet("/path/to/parquet/file.../...")

那么分区就不能保证,这取决于各种设置 - 请参阅Spark是否在读取时保持镶木地板分区?,注意API的发展并变得更好。

但是,这是:

val df = spark.read.parquet("/path/to/parquet/file.../.../partitioncolumn=*")

将按照您保存的分区结构以某种方式返回执行器上的分区,有点像SPARK bucketBy。

如果直接指定S3,驱动程序仅获取元数据。

用你的话来说:

  • “...每个执行程序直接下载数据(在工作线程节点上)?“ 是的
  • 元数据以某种方式通过驱动程序协调和S3上文件/目录位置的其他系统组件获得,但不是数据首先下载到驱动程序 - 这在设计中将是一个很大的愚蠢。但这也取决于 API 如何响应的语句格式。
 类似资料:
  • 我有一些数据存储在拼花格式的S3存储桶中,遵循类似蜂巢的分区风格,使用这些分区键:零售商-年-月-日。 如 我想在sagemaker笔记本中读取所有这些数据,我想将分区作为我的DynamicFrame的列,这样当我,包括它们。 如果我使用Glue建议的方法,分区就不会包含在我的模式中。下面是我使用的代码: 相反,通过使用普通的火花代码和DataFrame类,它可以工作,并且分区包含在我的架构中:

  • 我有以下制表符分隔的示例数据集: 我正在对此数据运行一些转换,最终数据位于spark dataset中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。 我的scala函数来保存TSV数据集。 在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod

  • 如果我是正确的,默认情况下,spark streaming 1.6.1使用单线程从每个Kafka分区读取数据,假设我的Kafka主题分区是50,这意味着每50个分区中的消息将按顺序读取,或者可能以循环方式读取。 案例1: -如果是,那么我如何在分区级别并行化读取操作?创建多个< code > kafkautils . createdirectstream 是唯一的解决方案吗? 案例2: -如果我的

  • 我的假设是,首先,spark会从cassandra读取数据,因此在这个阶段,cassandra的大分区不会因为重新分区而分裂。重新分区将对从Cassandra加载的基础数据起作用。 我只是想知道答案,当从spark读取数据时,重新分区是否会改变数据分布,而不是再次进行分区?

  • 有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。

  • 我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢