当前位置: 首页 > 知识库问答 >
问题:

是否有一种方法可以将parquet分区下的所有文件读到单个spark分区上?

慕容聪
2023-03-14
userData/
    partitionKey=1/
        part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
        part-00044-cf737804-90ea-4c37-94f8-9aa016f6953b.c000.snappy.parquet
    partitionKey=2/
        part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
    partitionKey=3/
        part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
    null

在读取数据时,我希望1个用户的所有数据都落入同一个spark分区。单个spark分区可以有1个以上的用户,但它应该有所有这些用户的所有行。

目前,我使用的是:sparksession.read.parquet(“../userdata”).repartition(200,col(“userid”))

(还尝试了使用自定义分区器的partitionBy;操作顺序:DataFrame->RDD->keyedrdd->partitionBy->RDD->DataFrame;在partitionBy之前,有一个反序列化到对象的步骤,该步骤可以分解洗牌写入)

共有1个答案

闾丘才哲
2023-03-14

sparksession.read.parquet应该根据文件路径自动推断分区信息。您可以在这里找到更多信息

如果文件路径为:

userData/
    UserId=1/
        part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
        part-00044-cf737804-90ea-4c37-94f8-9aa016f6953b.c000.snappy.parquet
    UserId=2/
        part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
    UserId=3/
        part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet

当调用sparksession.read.parquet(“/path/to/userdata”)时,它将按userid进行分区。

 类似资料:
  • 但是我得到了这个错误: sparkException:由于阶段失败而中止作业:阶段0.0中的任务236失败4次,最近的失败:阶段0.0中丢失任务236.3(TID 287,server,executor 17):org.apache.hadoop.security.AccessControlException:权限被拒绝:user=user,access=read,inode=“/path-to-

  • /tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?

  • 我可以使用这个问题中的技巧来强制初始分区和最终分区之间的关系,但是Spark不知道每个原始分区的所有内容都将转移到一个特定的新分区。因此,它不能优化掉洗牌,而且它的运行速度比慢得多。

  • 问题内容: 我有一个带有2个分区的Kafka集群。我一直在寻找一种将分区数增加到3的方法。但是,我不想丢失该主题中的现有消息。我尝试停止Kafka,修改文件以将分区数增加到3,然后重新启动Kafka。但是,这似乎并没有改变任何东西。使用Kafka ,我仍然看到它仅使用2个分区。我正在使用的Kafka版本是0.8.2.2。在0.8.1版中,曾经有一个名为的脚本,我想可能可以解决问题。但是,我在0.8

  • 我正在使用Spark2.0,我想知道,是否可以列出特定配置单元表的所有文件?如果是这样,我可以直接使用spark增量地更新这些文件。如何向配置单元表添加新分区?有没有关于蜂巢转移瘤的api我可以从Spark使用? 有什么方法可以获得映射dataframe的内部配置单元函数吗 我的主要理由是对表进行增量更新。现在,我知道的唯一方法是SQL+,这不是很有效,因为他将覆盖所有表,而我主要感兴趣的是对某些

  • 我有一个具有如下模式的dataframe: