当前位置: 首页 > 知识库问答 >
问题:

将spark数据拆分为分区并将这些分区并行写入磁盘

周鸿运
2023-03-14

问题概要:假设我有300 GB的数据正在AWS中的EMR集群上用火花处理。这些数据有三个属性,用于在Hive中使用的文件系统上进行分区:日期、小时和(比方说)另一个。我想以最小化写入文件数量的方式将此数据写入fs。

我现在正在做的是获取日期、小时、另一个时间的不同组合,以及有多少行构成组合的计数。我将它们收集到驱动程序上的列表中,并遍历列表,为每个组合构建一个新的DataFrame,使用行数重新分区该DataFrame以估计文件大小,并使用DataFrameWriter将文件写入磁盘,. orc完成它。

我们出于组织原因不使用镶木地板。

这种方法工作得相当好,解决了使用Hive而不是Spark的下游团队看不到大量文件导致的性能问题的问题。例如,如果我取出整个300 GB数据帧,用1000个分区(在spark中)和相关的列进行重新分区,并将其转储到磁盘,所有这些都是并行转储的,整个过程大约需要9分钟。但是,对于较大的分区,这会获得多达1000个文件,这会破坏Hive的性能。或者它破坏了某种性能,老实说,不是100%确定是什么。我刚刚被要求尽可能减少文件数量。使用我正在使用的方法,我可以将文件保持为我想要的任何大小(无论如何相对接近),但是没有并行性,并且需要大约45分钟来运行,主要是等待文件写入。

在我看来,由于某个源行和某个目标行之间存在一对一的关系,并且由于我可以将数据组织到不重叠的“文件夹”(Hive的分区)中,我应该能够以这样一种方式组织我的代码/DataFrames,即我可以要求spark并行编写所有目标文件。有没有人对如何应对这种情况提出建议?

我测试过的东西不起作用:

>

  • 使用scala并行集合来启动写入。无论Spark对DataFrames做了什么,它都没有很好地分离任务,一些机器遇到了大量的垃圾回收机制问题。

    DataFrame . map——我试图映射唯一组合的数据帧,并从那里开始写入,但是无法从那个< code>map中访问我实际需要的数据的数据帧——在executor上,data frame引用为空。

    DataFrame.map分区-一个非初学者,不能想出任何想法来做什么,我想从内部map的分区

    “分区”一词在这里也不是特别有用,因为它既指按某些标准划分数据的火花的概念,也指在磁盘上为Hive组织数据的方式。我想我很清楚上面的用法。所以,如果我想一个完美的解决方案来解决这个问题,那就是我可以创建一个DataFrame,它有1000个分区,基于这三个属性进行快速查询,然后从中创建另一个DataFrame集合,每个DataFrame都有这些属性的唯一组合,重新分区(在spark中,但对于Hive),分区的数量与它包含的数据的大小相适应。大多数DataFrame将有1个分区,少数将有多达10个分区。文件应为~3GB,并且我们的EMR集群的RAM比每个执行器的RAM多,因此我们不应该看到这些“大”分区的性能受到影响。

    创建数据帧列表并重新分区每个数据帧后,我可以要求 spark 将它们全部并行写入磁盘。

    火花中可能有这样的东西吗?

    有一件事我在概念上不清楚:说我有

    val x=spark.sql("从源代码中选择*")

    valy=x.where(s“date=$date and hour=$hour and anotherAttr=$anotheratr”)

    val z = x.where(s“date=$date and hour=$hour and otherAttr=$anotherAttr 2”)

    < code>y与< code>z在多大程度上是不同的数据帧?如果我重新划分< code>y,那么洗牌对< code>z和< code>x有什么影响?

  • 共有2个答案

    尉迟鸿熙
    2023-03-14

    这个声明:

    我将它们收集到驱动程序上的一个列表中,并遍历该列表,为每个组合构建一个新的DataFrame,使用行数对DataFrame进行重新分区,以估计文件大小,然后使用DataFrameWriter将文件写入磁盘,.orc完成。

    在火花方面完全脱离了光束。收集到驱动程序从来都不是一个好方法,卷和 OOM 问题以及方法中的延迟很高。

    使用下面的方法,以简化并获得Spark的并行性,从而为您的老板节省时间和金钱:

    df.repartition(cols...)...write.partitionBy(cols...)...
    

    随机通过重新分区发生,分区不会随机播放

    就这么简单,利用Spark的默认并行性。

    西门智
    2023-03-14

    我们遇到了同样的问题(几乎),我们最终直接使用RDD(而不是DataFrames)并实现了我们自己的分区机制(通过扩展org.apache.spark.分区器)

    细节:我们正在阅读来自Kafka的JSON消息。JSON应该按照customerid/date/more字段分组,并使用Parquet格式在Hadoop中编写,不要创建太多的小文件。

    步骤是(简化版):a)从Kafka读取消息并将它们转换为RDD[(GroupBy, Message)]的结构。GroupBy是一个案例类,包含用于分组的所有字段。

    b)使用reduceByKeyLocal转换并获取每个组的度量映射(消息数/消息大小/等) - 例如Map[GroupBy,GroupByMetrics]

    c)创建一个Group分区器,它使用以前收集的度量(以及一些输入参数,如所需的拼花大小等)来计算应该为每个GroupBy对象创建多少个分区。基本上,我们正在扩展org.apache.spark.分区器并覆盖数字分区和get分区(键:任何)

    d) 我们使用先前定义的partitioner:newPartitionedRdd=RDD.partitionBy(ourCustomGroupByPartitioner)对a)中的RDD进行分区

    e)使用两个参数调用spark.sparkContext.runJob:第一个参数是在d)处分区的RDD,第二个参数是一个自定义函数(func: (TaskContext,Iterator[T]),它将把从Iterator[T]获取的消息写入Hadoop/Parquet

    假设我们有 100 mil 消息,按此分组

    组1 - 2密耳

    Group2-80密耳

    Group3 - 18 mil,我们决定每个分区必须使用1.5 mil的消息来获得大于500MB的Parquet文件。我们最终将为组1提供2个分区,为组2提供54个分区,为组3提供12个分区。

     类似资料:
    • 问题内容: 我的问题是我有一个这样的表: c1 | c2 | c3 | c4是一个由|分隔的值。 我的最终结果应如下所示: 我该怎么做呢? 谢谢 问题答案: 这就是您可以执行的操作,使用管道将字符串拆分并使用spark函数爆炸数据 输出: 希望这可以帮助!

    • 问题内容: 我已使用从IMDB收集信息并将其传输到MYSQL数据库的应用程序导入了一些数据。 似乎这些字段尚未标准化,并且在1个字段中包含许多值 例如: 有没有办法将这些值分开,然后将它们插入到另一个表中,而不重复呢? 我进行了一些谷歌搜索,发现我应该使用PHP处理此数据。但是我一点都不了解PHP。 无论如何,仅使用MYSQL即可转换此​​数据? 问题答案: 您可以使用存储过程,该过程使用游标来解

    • 我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100

    • 我有下面的spark数据框架。 我必须将上面的数据帧列拆分为多个列,如下所示。 我尝试使用分隔符进行拆分;和限制。但是它也将主题拆分为不同的列。姓名和年龄被组合在一起成一列。我要求所有主题在一列中,只有姓名和年龄在单独的列中。 这在Pyspark有可能实现吗?

    • 正在尝试从csv文件中读取数据,将每行拆分为各自的列。 但是,当某个列本身带有逗号时,我的正则表达式就失败了。 例如:a, b, c,"d, e, g,", f 我想要的结果是: 也就是5列。 下面是用逗号分隔字符串的正则表达式am ,(?=(?:“[^”]?(?:[^”])*)),(?=[^”](?:,),$) 但是它对少数字符串失败,而对其他字符串有效。 我想要的是,当我使用pyspark将c

    • 我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢