问题概要:假设我有300 GB的数据正在AWS中的EMR集群上用火花处理。这些数据有三个属性,用于在Hive中使用的文件系统上进行分区:日期、小时和(比方说)另一个。我想以最小化写入文件数量的方式将此数据写入fs。
我现在正在做的是获取日期、小时、另一个时间的不同组合,以及有多少行构成组合的计数。我将它们收集到驱动程序上的列表中,并遍历列表,为每个组合构建一个新的DataFrame,使用行数重新分区该DataFrame以估计文件大小,并使用DataFrameWriter将文件写入磁盘,. orc
完成它。
我们出于组织原因不使用镶木地板。
这种方法工作得相当好,解决了使用Hive而不是Spark的下游团队看不到大量文件导致的性能问题的问题。例如,如果我取出整个300 GB数据帧,用1000个分区(在spark中)和相关的列进行重新分区,并将其转储到磁盘,所有这些都是并行转储的,整个过程大约需要9分钟。但是,对于较大的分区,这会获得多达1000个文件,这会破坏Hive的性能。或者它破坏了某种性能,老实说,不是100%确定是什么。我刚刚被要求尽可能减少文件数量。使用我正在使用的方法,我可以将文件保持为我想要的任何大小(无论如何相对接近),但是没有并行性,并且需要大约45分钟来运行,主要是等待文件写入。
在我看来,由于某个源行和某个目标行之间存在一对一的关系,并且由于我可以将数据组织到不重叠的“文件夹”(Hive的分区)中,我应该能够以这样一种方式组织我的代码/DataFrames,即我可以要求spark并行编写所有目标文件。有没有人对如何应对这种情况提出建议?
我测试过的东西不起作用:
>
使用scala并行集合来启动写入。无论Spark对DataFrames做了什么,它都没有很好地分离任务,一些机器遇到了大量的垃圾回收机制问题。
DataFrame . map——我试图映射唯一组合的数据帧,并从那里开始写入,但是无法从那个< code>map中访问我实际需要的数据的数据帧——在executor上,data frame引用为空。
DataFrame.map分区-一个非初学者,不能想出任何想法来做什么,我想从内部map的分区
“分区”一词在这里也不是特别有用,因为它既指按某些标准划分数据的火花的概念,也指在磁盘上为Hive组织数据的方式。我想我很清楚上面的用法。所以,如果我想一个完美的解决方案来解决这个问题,那就是我可以创建一个DataFrame,它有1000个分区,基于这三个属性进行快速查询,然后从中创建另一个DataFrame集合,每个DataFrame都有这些属性的唯一组合,重新分区(在spark中,但对于Hive),分区的数量与它包含的数据的大小相适应。大多数DataFrame将有1个分区,少数将有多达10个分区。文件应为~3GB,并且我们的EMR集群的RAM比每个执行器的RAM多,因此我们不应该看到这些“大”分区的性能受到影响。
创建数据帧列表并重新分区每个数据帧后,我可以要求 spark 将它们全部并行写入磁盘。
火花中可能有这样的东西吗?
有一件事我在概念上不清楚:说我有
val x=spark.sql("从源代码中选择*")
和
valy=x.where(s“date=$date and hour=$hour and anotherAttr=$anotheratr”)
和
val z = x.where(s“date=$date and hour=$hour and otherAttr=$anotherAttr 2”)
< code>y与< code>z在多大程度上是不同的数据帧?如果我重新划分< code>y,那么洗牌对< code>z和< code>x有什么影响?
这个声明:
我将它们收集到驱动程序上的一个列表中,并遍历该列表,为每个组合构建一个新的DataFrame,使用行数对DataFrame进行重新分区,以估计文件大小,然后使用DataFrameWriter将文件写入磁盘,.orc完成。
在火花方面完全脱离了光束。收集到驱动程序从来都不是一个好方法,卷和 OOM 问题以及方法中的延迟很高。
使用下面的方法,以简化并获得Spark的并行性,从而为您的老板节省时间和金钱:
df.repartition(cols...)...write.partitionBy(cols...)...
随机通过重新分区发生,分区
不会随机播放
。
就这么简单,利用Spark的默认并行性。
我们遇到了同样的问题(几乎),我们最终直接使用RDD(而不是DataFrames)并实现了我们自己的分区机制(通过扩展org.apache.spark.分区器)
细节:我们正在阅读来自Kafka的JSON消息。JSON应该按照customerid/date/more字段分组,并使用Parquet格式在Hadoop中编写,不要创建太多的小文件。
步骤是(简化版):a)从Kafka读取消息并将它们转换为RDD[(GroupBy, Message)]的结构。GroupBy是一个案例类,包含用于分组的所有字段。
b)使用reduceByKeyLocal转换并获取每个组的度量映射(消息数/消息大小/等) - 例如Map[GroupBy,GroupByMetrics]
c)创建一个Group分区器,它使用以前收集的度量(以及一些输入参数,如所需的拼花大小等)来计算应该为每个GroupBy对象创建多少个分区。基本上,我们正在扩展org.apache.spark.分区器并覆盖数字分区和get分区(键:任何)
d) 我们使用先前定义的partitioner:newPartitionedRdd=RDD.partitionBy(ourCustomGroupByPartitioner)对a)中的RDD进行分区
e)使用两个参数调用spark.sparkContext.runJob:第一个参数是在d)处分区的RDD,第二个参数是一个自定义函数(func: (TaskContext,Iterator[T]),它将把从Iterator[T]获取的消息写入Hadoop/Parquet
假设我们有 100 mil 消息,按此分组
组1 - 2密耳
Group2-80密耳
Group3 - 18 mil,我们决定每个分区必须使用1.5 mil的消息来获得大于500MB的Parquet文件。我们最终将为组1提供2个分区,为组2提供54个分区,为组3提供12个分区。
问题内容: 我的问题是我有一个这样的表: c1 | c2 | c3 | c4是一个由|分隔的值。 我的最终结果应如下所示: 我该怎么做呢? 谢谢 问题答案: 这就是您可以执行的操作,使用管道将字符串拆分并使用spark函数爆炸数据 输出: 希望这可以帮助!
问题内容: 我已使用从IMDB收集信息并将其传输到MYSQL数据库的应用程序导入了一些数据。 似乎这些字段尚未标准化,并且在1个字段中包含许多值 例如: 有没有办法将这些值分开,然后将它们插入到另一个表中,而不重复呢? 我进行了一些谷歌搜索,发现我应该使用PHP处理此数据。但是我一点都不了解PHP。 无论如何,仅使用MYSQL即可转换此数据? 问题答案: 您可以使用存储过程,该过程使用游标来解
我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100
我有下面的spark数据框架。 我必须将上面的数据帧列拆分为多个列,如下所示。 我尝试使用分隔符进行拆分;和限制。但是它也将主题拆分为不同的列。姓名和年龄被组合在一起成一列。我要求所有主题在一列中,只有姓名和年龄在单独的列中。 这在Pyspark有可能实现吗?
正在尝试从csv文件中读取数据,将每行拆分为各自的列。 但是,当某个列本身带有逗号时,我的正则表达式就失败了。 例如:a, b, c,"d, e, g,", f 我想要的结果是: 也就是5列。 下面是用逗号分隔字符串的正则表达式am ,(?=(?:“[^”]?(?:[^”])*)),(?=[^”](?:,),$) 但是它对少数字符串失败,而对其他字符串有效。 我想要的是,当我使用pyspark将c
我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢