当前位置: 首页 > 知识库问答 >
问题:

将数据导入Spark时,如何设置分区/节点的数量

董琦
2023-03-14

问题:我想使用以下命令将数据从S3导入Spark EMR:

data = sqlContext.read.json("s3n://.....")

我是否可以设置Spark用于加载和处理数据的节点数?这是我如何处理数据的示例

data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")

上下文:数据不太大,加载到Spark和查询需要很长时间。我认为Spark将数据分割成了太多的节点。我希望能够手动设置。我知道在处理rdd和< code > sc . parallelism 时,我可以将分区数量作为输入传递。还有,我看过< code>repartition(),但是不确定是否能解决我的问题。在我的例子中,变量< code>data是一个< code>DataFrame。

让我更精确地定义分区。定义一:通常称为“分区键”,其中选择一列并建立索引以加快查询速度(这不是我想要的)。定义二:(这是我关心的地方)假设您有一个数据集,Spark决定将其分布在许多节点上,以便它可以并行运行对数据的操作。如果数据大小太小,这可能会进一步减慢过程。我如何设置该值

共有3个答案

汤念
2023-03-14

“输入”分区的数量由文件系统配置固定。

1Go的1个文件,块大小为128M会给你10个任务。我不确定你能改变它。

重新分区可能非常糟糕,如果你有很多输入分区,这将在分区之间产生大量的随机(数据流量)。

没有神奇的方法,你必须尝试,并使用webUI查看生成了多少任务。

唐阳晖
2023-03-14

您可以在dataframe上调用重新分区()来设置分区。您甚至可以在创建hive上下文后或通过传递给spark-提交jar来设置spark.sql.shuffle.partitions此属性:

spark-submit .... --conf spark.sql.shuffle.partitions=100

或者

dataframe.repartition(100)
丌官积厚
2023-03-14

默认情况下,它分为 200 个集。您可以通过在sql context sqlContext.sql(“set spark.sql.shuffle.partitions=10”)中使用set命令来更改它;。但是,您需要根据数据特征谨慎设置。

 类似资料:
  • 有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。

  • 当在火花中进行连接时,或者通常是对于随机操作,我可以设置分区的最大数量,我希望火花在其中执行此操作。 按留档: 火花sql。洗牌分区200配置在为联接或聚合洗牌数据时要使用的分区数。 如果我想减少每个任务中必须完成的工作量,我必须估计数据的总大小并相应地调整此参数(分区越多,意味着单个任务中完成的工作量越少,但任务越多)。 我想知道,我能告诉spark根据数据量简单地调整分区的数量吗?一、 e.在

  • null 非常感谢任何指向文档或非常基本的示例的指针。

  • 问题内容: 我需要更改单个数据库的时区吗? 我知道我们可以在WHM中更改时区(我们正在使用hostgator的专用服务器),但是服务器上运行的大量旧版软件中有很多+6小时的编码(即服务器时区为CST,需要GMT时间,因此以前的开发人员会在代码中手动更改日期/时间- 很糟糕!)。 我现在正在开发一个新软件,并希望将其全部保存在GMT中,我知道我可以使用date_default_timezone_se

  • 查询示例: 典型错误消息: 处理语句时出错:失败:执行错误,从org.apache.hadoop.hive.ql.exec.mr.MapredTask返回代码2 问题2:当我运行命令?我是否只运行相同的命令,但使用STRING而不是bigint?**完整错误消息:**

  • 问题概要:假设我有300 GB的数据正在AWS中的EMR集群上用火花处理。这些数据有三个属性,用于在Hive中使用的文件系统上进行分区:日期、小时和(比方说)另一个。我想以最小化写入文件数量的方式将此数据写入fs。 我现在正在做的是获取日期、小时、另一个时间的不同组合,以及有多少行构成组合的计数。我将它们收集到驱动程序上的列表中,并遍历列表,为每个组合构建一个新的DataFrame,使用行数重新分