当前位置: 首页 > 知识库问答 >
问题:

重新分区和合并未按预期工作

赫连冠玉
2023-03-14

我有一个2.5 GB的数据帧。分区数为5000。我正在尝试重新分区,然后将其持久化。但是在我读取持久化数据之后,分区的数量正在改变。

val df = spark.read.parquet(".../oldPartition") // df has 5000 partitions
df.repartition(300).write.parquet(".../newPartition")

df.read.parquet(".../newPartition") // This doesn't have 300 partitions as expected.

我甚至尝试使用coalesce,但没有运气。有人能解释一下发生了什么吗?

共有2个答案

薛泰
2023-03-14

唯一的办法是这样的:

在写入时使用重新分区和分区By:

...
val df2 = df.repartition(7, $"_2")
//df2.rdd.glom().map(_.length).collect()
df2.write.partitionBy("_2").csv("/SOQ2")
...

在随后的阅读中:

...
val df3 = spark.read.csv("/SOQ2") 
val df4 = df3.repartition(7, $"_2")
// this val df4 = df3.repartition(7) gives different distribution
df4.rdd.glom().map(_.length).collect()
...

然后填充分区等是相同的,但必须在运行时以这种方式强制执行;除非您使用bucketBy

丰智
2023-03-14

写数据时,为每个分区Spark写一个单独的文件。因此,您可以检查一下,在您写入重新分区的数据帧的地方,是否真的有300个拼花文件。

但是读取的分区数是另一回事。它受到许多因素的影响,例如:

  • 您正在读取的文件数
  • 您正在读取的文件大小
  • spark.default.parallelism
  • spark.files.max分区字节

您可以查看他们的源代码以了解更多详细信息

 类似资料:
  • 我已经编写了如下所示的重新调度代码。尽管时间表上写着“每120秒重复一次”,但在结束UpdateQuartzJobTrigger方法后,Execute()方法会立即被调用。

  • 我目前正试图研究并发性,特别是“volatile”关键字。 通过声明计数器变量为volatile,所有对计数器变量的写入都将立即写回主存。此外,计数器变量的所有读取都将直接从主存中读取。下面是计数器变量的volatile声明的外观 和 当线程写入易失性变量时,不仅易失性变量本身会被写入主内存。此外,线程在写入易失性变量之前更改的所有其他变量也会刷新到主内存中。当一个线程读取一个易失性变量时,它还将

  • 我需要使用“;”拆分字符串作为分隔符,如果字符串中所有的字段都被填充了,这很好,但是如果一些字段没有被填充,比如< code > string . split(" A;b;c;;;")不起作用...对于这个字符串,我预计输出将 [0]=A [1] =B [2]=C [3]='' [4]='' [5]='' ,但输出只有前三个字段 [0]=A [1] =B [3]=C ...未创建其他字段 如何解决

  • 我正在使用spring Roo并希望访问Controller类中的一个bean,该类在ApplicationContext.xml中具有以下配置: 配置类本身是: 在我的Controller中,我认为一个简单的Autowired注释应该可以完成这项工作 在启动过程中,spring在setSkipWeeks方法中打印消息。不幸的是,每当我在控制器中调用config.getSkipWeeks()时,它

  • 当我运行以下程序时,它只打印 然而,从Java 8的equalsIgnoreCase文档中我们发现: 如果以下至少一项为真,则两个字符c1和c2被视为相同的忽略情况: •对每个字符应用java.lang.character.ToUpperCase(char)方法会产生相同的结果 所以我的问题是为什么这个程序不打印 在这两种操作中,都使用了大写字符。

  • 我试图使用来传输我根据前面的问题设置的自定义标头。 我在文件中读到... 我的属性包括: