当前位置: 首页 > 知识库问答 >
问题:

RDD API与UDF混合DataFrame API的性能影响

娄浩荡
2023-03-14

(Scala特定问题。)

虽然Spark docs鼓励尽可能使用DataFrame API,但如果DataFrame API不够,则通常会选择使用RDD API还是使用UDF。这两个备选方案之间是否存在固有的性能差异?

RDD和UDF的相似之处在于,它们都不能从催化剂和钨优化中获益。是否有其他开销,如果有,这两种方法之间是否有区别?

举一个具体的例子,假设我有一个DataFrame,其中包含一列具有自定义格式的文本数据(不适合正则表达式匹配)。我需要解析该列并添加一个包含结果标记的新向量列。

共有2个答案

长孙兴德
2023-03-14

(注意:我没有为此测量背衬)

对我来说,洗牌和(反)序列化是主要成本。但在这些之后,拥有干净的代码是最重要的。记住这一点:

使用RDD操作的主要缺点是需要将/序列化为完整的jvm对象。虽然使用udf可能只会(取消)序列化所需的列。请注意,这是在处理面向列的数据时,例如拼花,对于其他我不知道的数据格式,但预计在许多情况下两者都具有相似的perf。

因此,如果您的算法主要是过滤和混洗op,并且/或者可以简单地用dataframe op和local udf表示,您应该使用它们。然而,如果您的算法需要对许多列进行复杂的处理,那么最好预先支付反序列化费用,并在jvm对象上执行干净高效的scala代码。

因此,在我实现复杂数学算法的个人经验中,我通常将代码分为两个步骤:

  1. 纯dataframe op可以执行尽可能多的过滤、连接和分组操作。在极少数情况下,当需要特定的本地op时,我可以使用udf,而该op不能使用dataframe方法来表示(如果它只需要很少的列)
微生弘
2023-03-14

它们都不能从催化剂和钨优化中受益

这并不完全正确。虽然UDF没有从钨优化中受益(可以说简单的SQL转换也没有得到巨大的提升),但您仍然可以从Catalyst提供的执行计划优化中受益。让我们用一个简单的例子来说明这一点(注意:Spark 2.0和Scala。不要将此推断到早期版本,尤其是使用PySpark):

val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)

val df = Seq(("a", 1), ("b", 2)).toDF

df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain

// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]

执行计划向我们展示了几件事:

  • 在聚合之前已下推所选内容
  • 在聚合之前,投影被向下推,并有效地删除了第二个UDF调用

根据数据和管道,这几乎可以免费提供实质性的性能提升。

也就是说,RDD和UDF都需要在安全和不安全之间进行迁移,而后者的灵活性要低得多。尽管如此,如果您只需要一个简单的类似映射的行为,而不需要初始化昂贵的对象(如数据库连接),那么UDF就是一种选择。

在稍微复杂的场景中,您可以轻松下拉到通用Dataset并为确实需要访问自定义分区等一些低级功能的情况保留RDD

 类似资料:
  • 问题内容: 我试图了解什么是最佳实践,以及为什么要在不同情况下串联字符串文字和变量。例如,如果我有这样的代码 这是这样做的方式吗?从这篇文章中,我注意到Strings上的运算符创建了一个StringBuilder的新实例,连接了操作数,并返回了String转换,这似乎比仅仅调用还要多;所以如果这是真的,那是不可能的。但是那又如何呢?是否适合用于每个串联?或仅用于变量,文字可以追加吗? 应对这些情况

  • 在ORM映射类上定义具有“混合”行为的属性。 “混合”是指属性在类级别和实例级别定义了不同的行为。 这个 hybrid 扩展提供了一种特殊形式的方法修饰器,大约有50行代码,几乎不依赖于其他的sqlacalchemy。理论上,它可以与任何基于描述符的表达式系统一起工作。 考虑映射 Interval ,表示整数 start 和 end 价值观。我们可以在生成类级SQL表达式的映射类上定义更高级别的函

  • 首先非常抱歉Tinker没有按期内测,这主要因为开源的代码需要通过公司内部审核与评测,这项工作大约还需要一个月左右。当前Tinker已经在公司内部开源,我们会努力让它以更完善的姿态与大家见面。 大约在六月底,Tinker在微信全量上线了一个补丁版本,随即华为反馈在Android N上微信无法启动。冷汗冒一地,Android N又搞了什么东东?为什么与instant run保持一致的补丁方式也跪了?

  • 当以下转换在将RDD写入文件之前执行时,它们之间有什么区别? 聚结(1,洗牌=true) 合并(1,洗牌=假) 代码示例: 它与collect()相比如何?我完全知道Spark save方法将以HDFS风格的结构存储它,但我更感兴趣的是collect()和shuffled/non shuffled coalesce()的数据分区方面。

  • 问题内容: 在Java中,这样做并使用相同的锁定机制? 我的猜测是“不”,但我希望是错的。 例: 想象一下,线程1和线程2都可以访问: 线程1运行: 线程2运行: 假设线程1首先到达其部分,然后在线程1完成之前到达线程2:线程2将等待线程1离开该块,还是继续运行? 问题答案: 不,即使线程1 在同一线程上,线程2也可以。这是文档必须说的: 请注意,Lock实例只是普通对象,它们本身可以用作同步语句

  • 问题内容: 我是使用属性的新手,因此我进行了如下所示的简单测试。在测试中,我创建了两个类“ Test1”和“ Test2”,每个类都持有一个值。我正在尝试使用属性来控制对伪隐藏的“ val”属性的访问。当前测试不限制“ val”属性的任何输入或输出,因为该程序仅是概念证明。下面显示的两个测试类产生相同的结果,并被认为代表了构造属性的不同方法。我要引用的属性的示例使用在python docs上找到。