当前位置: 首页 > 知识库问答 >
问题:

无法在spark中将有序数据写入拼花

黎同
2023-03-14

我正在使用Apache Spark生成拼花文件。我可以按日期划分它们,没有问题,但在内部我似乎无法按正确的顺序排列数据。

在处理过程中,顺序似乎丢失了,这意味着拼花地板元数据是不正确的(具体来说,我想确保拼花地板行组反映排序顺序,以便特定于我的用例的查询可以通过元数据有效地过滤)。

考虑以下示例

// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT  id, sampleTime, ... , toDate(sampleTime) as date FROM hbaseSource")

// Repartion the input set by the date column (in my source there should be 2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id", "sampleTime")

sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")

使用这种方法,我确实得到了正确的拼花地板分区结构(按日期)。更好的是,对于每个日期分区,我看到一个大的拼花文件。

 /outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet

但是,当我查询文件时,我看到内容乱序。具体来说,“乱序”似乎更像是几个有序的数据帧分区已合并到文件中。

拼花行组元数据显示排序后的字段实际上是重叠的(例如,一个特定的id可能位于许多行组中):

id:             :[min: 54, max: 65012, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 827, max: 65470, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 1629, max: 61412, num_nulls: 0]

我希望数据在每个文件中正确排序,以便每个行组中的元数据最小值/最大值不重叠。

例如,这是我想看到的模式:

RG 0: id:             :[min: 54, max: 100, num_nulls: 0]
RG 1: id:             :[min: 100, max: 200, num_nulls: 0]

…其中RG=“行组”。如果我想要id=75,查询可以在一个行组中找到它。

我已经尝试了上述代码的许多变体。例如,有和没有合并(我知道合并是不好的,但我的想法是用它来防止洗牌)。我也尝试了排序而不是sortWithinPartitions(排序应该创建一个总排序,但会导致许多分区)。例如:

val sorted = transformed.repartition($"date").sort("id", "sampleTime") 
sorted.write.partitionBy("date").parquet(s"/outputFiles")

给了我 200 个文件,太多了,而且它们仍然没有正确排序。我可以通过调整随机大小来减少文件计数,但我本来希望在写入过程中按顺序处理排序(我的印象是写入不会随机输入)。我看到的顺序如下(为简洁起见,省略了其他字段):

+----------+----------------+
|id|      sampleTime|
+----------+----------------+
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|

这看起来像是交错排序的分区。因此,我认为重新分区在这里没有给我带来任何好处,而且sort似乎无法保持写入步骤的顺序。

我读到我想做的事情应该是可能的。我甚至尝试过Ryan Blue的演讲“Parquet性能调优:缺失的指南”中概述的方法(不幸的是,它位于OReily付费墙后面)。这涉及使用insertInto。在这种情况下,spark似乎使用了旧版本的parquet mr,这损坏了元数据,我不确定如何升级它。

我不确定我做错了什么。我的感觉是我误解了重新分区($“日期”)排序工作和/或交互的方式。

我会感谢任何想法。为这篇文章道歉。:)

编辑:还要注意,如果我在< code>transformed.sort("id "," sampleTime")上执行< code>show(n)操作,数据会正确排序。因此,问题似乎发生在写入阶段。如上所述,在写入过程中,排序的输出似乎被打乱了。

共有2个答案

滑文昌
2023-03-14

只是想法,合并后排序:"。合并(1)。sortWithinPartitions()"。预期结果看起来也很奇怪——为什么需要拼花地板中的有序数据?阅读后排序看起来更合适。

封永嘉
2023-03-14

问题是在保存文件格式时,Spark需要一些顺序,如果顺序不满足,Spark将在保存过程中根据要求对数据进行排序,并且会忘记您的排序。更具体地说,Spark需要这个顺序(这直接取自Spark 2.4.4的Spark源代码):

val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns

其中分区列是用于对数据进行分区的列。您没有使用存储桶,因此存储桶 IdExpressionsortColumns 在此示例中不相关,所需的排序将仅是分区列。因此,如果这是您的代码:

val sorted = transformed.repartition($"date").sortWithinPartitions("id", 
"sampleTime")

sorted.write.partitionBy("date").parquet(s"/outputFiles")

Spark将检查数据是否按date排序,而不是,因此Spark将忘记您的排序并按date对其进行排序。另一方面,如果您这样做:

val sorted = transformed.repartition($"date").sortWithinPartitions("date", "id", 
"sampleTime")

sorted.write.partitionBy("date").parquet(s"/outputFiles")

Spark 将再次检查数据是否按日期排序,这次是(满足要求),因此 Spark 将保留此顺序,并且在保存数据时不再诱导排序。所以我相信这样它应该有效。

 类似资料:
  • 我试图做一些非常简单的事情,我有一些非常愚蠢的挣扎。我想这一定与对火花的基本误解有关。我非常感谢任何帮助或解释。 我有一张非常大的桌子(~3 TB,~300毫米行,25k个分区),在s3中保存为拼花地板,我想给一些人一个很小的拼花文件样本。不幸的是,这要花很长时间才能完成,我不明白为什么。我尝试了以下方法: 然后当这不起作用时,我尝试了这个,我认为应该是一样的,但我不确定。(我添加了,以尝试调试。

  • 我有一个很大的数据框,我正在HDFS中写入拼花文件。从日志中获取以下异常: 谷歌对此进行了搜索,但找不到任何具体的解决方案。将推测设置为false:conf.Set(“spark.投机”,“false”) 但仍然没有帮助。它只完成了几个任务,生成了几个零件文件,然后突然因此错误而停止。 详细信息:Spark版本:2.3.1(这在1.6x中没有发生) 只有一个会话正在运行,这排除了不同会话访问同一位

  • 我设法将数据插入身份验证,但无法插入数据库: 代码: 错误: E/AndroidRuntime:致命异常:主进程:com。实例budgetingapp,PID:11507 java。lang.RuntimeException:在类androidx上找到了名称getText的冲突获取程序。appcompat。小装置。AppCompativeText位于com。谷歌。火基。消防商店。util。Cust

  • 我正在使用下面的代码片段来保存数据。它只会在同一分区文件夹下创建一个新的拼花地板文件。是否有任何方法可以将数据真正附加到现有的拼花地板文件中。所以,如果一天中有许多附件,我们就不会有多个文件? <代码>测向。聚结(1)。写模式(“追加”)。partitionBy(“paritionKey”)。拼花地板(“…\parquet\u file\u folder\”) 非常感谢你的帮助。

  • Spark版本:2.3 hadoop dist:azure Hdinsight 2.6.5平台:azure存储:BLOB 集群中的节点:6个执行器实例:每个执行器6个内核:每个执行器3个内存:8gb 试图通过同一存储帐户上的spark数据框将azure blob(wasb)中的csv文件(大小4.5g-280列,2.8 mil行)加载到拼花格式。我重新划分了大小不同的文件,即20、40、60、10

  • null 非常感谢任何指向文档或非常基本的示例的指针。