当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)-
逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器?
此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?
S3上的数据显然是HDFS外部的。
您可以通过提供一个或多个路径,或使用Hive Metastore从S3读取数据——如果您已经通过为外部S3表创建DDL,并为分区使用MSCK,或为EMR上的Hive更改table_name RECOVER PARTITIONS更新了这一点。
如果您使用:
val df = spark.read.parquet("/path/to/parquet/file.../...")
那么分区就不能保证,这取决于各种设置 - 请参阅Spark是否在读取时保持镶木地板分区?,注意API的发展并变得更好。
但是,这是:
val df = spark.read.parquet("/path/to/parquet/file.../.../partitioncolumn=*")
将按照您保存的分区结构以某种方式返回执行器上的分区,有点像SPARK bucketBy。
如果直接指定S3,驱动程序仅获取元数据。
用你的话来说:
我有一些数据存储在拼花格式的S3存储桶中,遵循类似蜂巢的分区风格,使用这些分区键:零售商-年-月-日。 如 我想在sagemaker笔记本中读取所有这些数据,我想将分区作为我的DynamicFrame的列,这样当我,包括它们。 如果我使用Glue建议的方法,分区就不会包含在我的模式中。下面是我使用的代码: 相反,通过使用普通的火花代码和DataFrame类,它可以工作,并且分区包含在我的架构中:
我有以下制表符分隔的示例数据集: 我正在对此数据运行一些转换,最终数据位于spark dataset中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。 我的scala函数来保存TSV数据集。 在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod
如果我是正确的,默认情况下,spark streaming 1.6.1使用单线程从每个Kafka分区读取数据,假设我的Kafka主题分区是50,这意味着每50个分区中的消息将按顺序读取,或者可能以循环方式读取。 案例1: -如果是,那么我如何在分区级别并行化读取操作?创建多个< code > kafkautils . createdirectstream 是唯一的解决方案吗? 案例2: -如果我的
我的假设是,首先,spark会从cassandra读取数据,因此在这个阶段,cassandra的大分区不会因为重新分区而分裂。重新分区将对从Cassandra加载的基础数据起作用。 我只是想知道答案,当从spark读取数据时,重新分区是否会改变数据分布,而不是再次进行分区?
有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。
我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢