当前位置: 首页 > 知识库问答 >
问题:

在Scala中使用Spark数据集执行类型化联接

萧明贤
2023-03-14

我喜欢Spark数据集,因为它们在编译时会给我带来分析错误和语法错误,还允许我使用getter而不是硬编码的名称/数字。大多数计算都可以通过DataSet的高级API完成。例如,通过访问Dataset类型对象的执行agg、select、sum、avg、map、filter或groupBy操作要比使用RDD行的数据字段简单得多。

但是其中缺少join操作,我读到我可以像这样执行join操作

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

我使用的是Spark 1.6.1和Scala 2.10

共有1个答案

谷梁向荣
2023-03-14

Spark SQL只能在联接条件基于相等运算符的情况下优化联接。这意味着我们可以分别考虑等价连接和非等价连接。

通过将数据集映射到(key,value)元组,执行基于key的联接,并重新塑造结果,可以以类型安全的方式实现Equijoin:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

可以用关系代数运算符表示为R?S=?(R?S)并直接转换为代码

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

如果您不局限于Spark SQL APIframeless数据集提供了有趣的类型安全扩展(到目前为止,它只支持Spark2.0):

import frameless.TypedDataset

val typedPoints1 = TypedDataset.create(points1)
val typedPoints2 = TypedDataset.create(points2)

typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))

datasetAPI在1.6中不稳定,所以我认为在1.6中使用它没有意义。

当然,这种设计和描述性的名称是不必要的。您可以轻松地使用type class将此方法隐式添加到DataSet中,并且与内置签名没有冲突,因此两者都可以称为JoinWith

 类似资料:
  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 我想知道在使用TypeScript时,是否有任何工具或技术用于对对象数据进行低级验证。一个例子是HTTP服务上的POST请求的JSON主体。通常,我为预期的数据创建了一个接口,然后将数据强制转换到该接口,但我知道这是肤浅的。 结合静态和运行时类型检查以提高开发效率

  • 问题内容: 我想为我正在执行的映射操作在DataSet中为Row类型编写一个编码器。本质上,我不了解如何编写编码器。 以下是地图操作的示例: 我知道,编码器需要编写如下字符串,而不是字符串: 但是,我不了解编码器中的clsTag(),并且我试图找到一个可以演示类似内容的运行示例(即,用于行类型的编码器) 编辑- 这不是所提问题的副本:尝试将数据框行映射到更新行时出现编码器错误,因为答案涉及在Spa

  • 我有一个以下数据(alldata),它有SQL查询和视图名称。 我已经拆分并正确地将其分配给诱惑(alldata) 当我尝试执行查询并从中注册tempview或表时,它显示空指针错误。但是当我注释掉spark时,PRINTLN显示了表中的所有值。sql语句。 但是当我用spark.sql执行它时,它会显示以下错误,请帮助我出错的地方。 19/12/09 02:43:12错误执行器:在阶段4.0任务

  • 主要内容:实例Scala 与 Java有着相同的数据类型,下表列出了 Scala 支持的数据类型: 数据类型 描述 Byte 8位有符号补码整数。数值区间为 -128 到 127 Short 16位有符号补码整数。数值区间为 -32768 到 32767 Int 32位有符号补码整数。数值区间为 -2147483648 到 2147483647 Long 64位有符号补码整数。数值区间为 -9223372036

  • Scala 与 Java有着相同的数据类型,下表列出了 Scala 支持的数据类型: 数据类型 描述 Byte 8位有符号补码整数。数值区间为 -128 到 127 Short 16位有符号补码整数。数值区间为 -32768 到 32767 Int 32位有符号补码整数。数值区间为 -2147483648 到 2147483647 Long 64位有符号补码整数。数值区间为 -9223372036