当前位置: 首页 > 知识库问答 >
问题:

如何定义DataFrame的分区?

柳英资
2023-03-14

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames。我想在Scala中定义一个DataFrames上的自定义分区器,但不知道如何做到这一点。

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

但我找不到一种方法来定义这一点。DataFrame类有一个名为'repartition(Int)'的方法,您可以在其中指定要创建的分区数。但是我没有看到任何方法可以为数据帧定义自定义分区器,比如可以为RDD指定的分区器。

源数据存储在Parquet中。我确实看到,在向Parquet编写DataFrame时,可以指定要分区的列,所以我可以告诉Parquet按“Account”列分区它的数据。但是可能有数百万个帐户,如果我正确理解了Parquet,它会为每个帐户创建一个不同的目录,所以这听起来不是一个合理的解决方案。

是否有一种方法可以让Spark对这个数据目录进行分区,以便一个帐户的所有数据都在同一个分区中?

共有1个答案

茅鸿宝
2023-03-14

SPARK-22614公开了范围分区。

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389在数据源API V2中公开了外部格式分区。

在Spark>=1.6中,可以使用按列分区进行查询和缓存。参见:使用重新分区方法的SPARK-11410和SPARK-4849:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDsSparkdataset不同(包括dataset[Row],又名dataframe)目前还不能使用自定义分区器。您通常可以通过创建一个人工分区列来解决这个问题,但它不会给您同样的灵活性。

您可以做的一件事是在创建DataFrame之前对输入数据进行预分区

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

由于从rdd创建dataframe只需要一个简单的映射阶段,因此应该保留现有的分区布局*:

assert(df.rdd.partitions == partitioned.partitions)
sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

>

  • 重新分区是一个昂贵的过程。在典型的场景中,大多数数据必须被序列化、洗牌和反序列化。另一方面,可以从预分区数据中受益的操作数量相对较少,如果内部API未设计为利用该属性,则会受到进一步限制。

    • 在某些情况下加入,但需要内部支持,
    • 窗口函数调用匹配的分区器。同上,仅限于单一窗口定义。但是它已经在内部分区,所以预分区可能是多余的,
    • 使用分组的简单聚合--可以减少临时缓冲区**的内存占用,但总体成本要高得多。与groupbykey.mapvalues(_.reduce)(当前行为)和reducebykey(预分区)大致相同。在实践中不太可能有用。
    • 使用sqlcontext.cachetable进行数据压缩。由于它看起来使用的是运行长度编码,所以应用orderedrddfunctions.repartitionandsortwithinpartitions可以提高压缩比。

    性能高度依赖于密钥的分布。如果它是倾斜的,它将导致一个次优的资源利用。在最坏的情况下,根本不可能完成这项工作。

    JDBC数据源支持谓词参数。它可以使用如下:

    sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
    

    它为每个谓词创建一个JDBC分区。请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果表中看到重复的集合。

    DataFrameWriter中的PartitionBy方法:

    SparkDataFrameWriter提供PartitionBy方法,该方法可用于在写入时对数据进行“分区”。它使用提供的列集在写入时分离数据

    val df = Seq(
      ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
    ).toDF("k", "v")
    
    df.write.partitionBy("k").json("/tmp/foo.json")
    

    对于基于键的查询,这将在读取时启用谓词下推:

    val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
    df1.where($"k" === "bar")
    

    但它不等同于dataframe.repartition。特别是像这样的聚合:

    val cnts = df1.groupBy($"k").sum()
    

    仍需要TungstenExchange:

    cnts.explain
    
    // == Physical Plan ==
    // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
    // +- TungstenExchange hashpartitioning(k#90,200), None
    //    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
    //       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
    

    DataFrameWriter中的Bucketby方法(Spark>=2.0):

    Bucketby具有与PartitionBy类似的应用程序,但它仅适用于表(SaveStable)。Bucketing信息可用于优化联接:

    // Temporarily disable broadcast joins
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df.write.bucketBy(42, "k").saveAsTable("df1")
    val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
    df2.write.bucketBy(42, "k").saveAsTable("df2")
    
    // == Physical Plan ==
    // *Project [k#41, v#42, v2#47]
    // +- *SortMergeJoin [k#41], [k#46], Inner
    //    :- *Sort [k#41 ASC NULLS FIRST], false, 0
    //    :  +- *Project [k#41, v#42]
    //    :     +- *Filter isnotnull(k#41)
    //    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
    //    +- *Sort [k#46 ASC NULLS FIRST], false, 0
    //       +- *Project [k#46, v2#47]
    //          +- *Filter isnotnull(k#46)
    //             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
    

    *分区布局我只指一个数据分布。分区RDD不再有分区程序。**假设没有早期预测。如果聚合只覆盖很小的列子集,那么可能没有任何好处。

  •  类似资料:
    • 我一直在试验PySpark RDDs的分区和重新分区。 我注意到,当将一个小样本 RDD 从 2 个分区重新分区到 6 个分区时,只是添加了几个空部分。 现在,我想知道这是否也发生在我的真实数据中。 似乎我不能在较大的数据上使用glom()(带有192497行的df)。 因为当我尝试时,什么也没发生。不过这是有道理的,由此产生的印刷品将是巨大的...... 因此 我想打印每个分区,检查它们是否是空

    • 本文向大家介绍Django如何自定义分页,包括了Django如何自定义分页的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了Django自定义分页的具体代码,供大家参考,具体内容如下 稳扎稳打版 book.html 封装保存版 封装保存版 封装版使用指南 封装版对应的HTML参考 效果图如下:  以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。

    • 从Spark1.6开始,根据官方文档,我们不能向DataFrame添加特定的配置单元分区 我相信这很有效。在Spark1.6中有没有替代方案可以实现这一点? 根据我的理解,Spark1.6加载了所有分区,如果我筛选特定分区,它效率不高,它会击中内存并抛出GC(垃圾收集)错误,因为成千上万的分区被加载到内存中,而不是特定的分区。

    • 假设我有两个数据帧df1:col1 col2 col3 df2:col1 col2 col4 我想使用col1和col2连接两个数据帧,而不定义新的别名表名。 我不想做 df=df1.join(df2,(df1.col1==df2.col1) 所以最终的数据帧只有col1 col2 col3 col4 如何实现这一点?

    • 问题内容: 我正在尝试使我的对象可包裹。但是,我有自定义对象,这些对象具有ArrayList我制作的其他自定义对象的属性。 最好的方法是什么? 问题答案: 您可以在此处,此处(在此处获取代码)和此处找到一些示例。 您可以为此创建一个POJO类,但是您需要添加一些额外的代码来实现它Parcelable。看一下实现。 创建此类后,您可以Intent像这样轻松地传递此类的对象,并在目标活动中恢复该对象。

    • 主要内容:前记,1.自定义视图,2.自定义异常,3.自定义异常的原理前记 在前面的文章中, 表示了视图解析的原理和异常解析器的解析原理。 这篇通过如何自定义视图和自定义异常处理和自定义异常处理的原理进行说明。 这里说明一下, 自定义的视图和自定义的异常都是会代替容器默认的组件的, 异常还好说, 就是不符合就抛, 视图的话需要注意一下优先级, 可以在自定义的视图解析器上加上注解。 1.自定义视图 这里原理就是添加一个视图和视图解析器, 然后放入容器中, 最后访问相应

    • TensorFlow GraphDef based models (typically created via the Python API) may be saved in one of following formats: TensorFlow SavedModel Frozen Model Session Bundle Tensorflow Hub module All of above f

    • 在Elasticsearch中,我想用我的自定义分析器索引一些字段。因此,首先,我将分析器添加到其他配置中 Liferay公司- 其他索引配置 覆盖类型映射 在Liferay Elasticsearch中添加此属性后,我重置了索引,重新启动了Liferay。Portal使用我的映射和分析器正确创建了一个新索引。然后我重新索引了我的文档。当我在Elasticsearch中搜索某物时,它会显示预期的结