我正在尝试使用Spark数据集API,但在进行简单连接时遇到了一些问题。
假设我有两个带有字段的数据集:date | value
,那么在DataFrame
的情况下,我的连接如下所示:
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
但是,对于数据集,有一个。joinWith方法,但相同的方法不起作用:
val dfA : Dataset
val dfB : Dataset
dfA.joinWith(dfB, ? )
所需的参数是什么。是否连接?
对于上面的示例,您可以尝试以下操作:
为输出定义案例类
case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)
使用Seq(“key”)连接两个数据集,这将帮助您避免输出中出现两个重复的key列,这也将有助于在下一步中应用case类或提取数据
val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
// res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]
结果将是平淡的:
joined.show
+---+-----+----+----+
|key|value|num1|num2|
+---+-----+----+----+
| 1| asdf| 7.7| 101|
| 2|34234| 1.2| 10|
+---+-----+----+----+
来自https://docs.cloud.databricks.com/docs/latest/databricks_guide/05Spark/1 Intro Datasets.html
看来你可以
dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
要使用joinWith,首先必须创建一个数据集,很可能是其中的两个。要创建数据集,需要创建一个与html" target="_blank">模式匹配的case类,并调用DataFrame。as[T]其中,T是您的案例类。因此:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
您还可以跳过case类并使用元组:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
如果您有另一个案例类/DF,如下图所示:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
然后,虽然join和joinWith的语法相似,但结果不同:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
如您所见,joinBy
将对象作为元组的一部分保持不变,而joinTo
将列展平为单个命名空间。(在上面的情况下会导致问题,因为列名“key”是重复的。)
奇怪的是,我不得不使用df。列(“键”)和df2。col(“key”)来创建连接ds和ds2的条件——如果只在任何一侧使用col(“key”),则不起作用,而ds。列(…) 不存在。使用原始df。然而,col(“key”)起到了关键作用。
我想要两个连接两个数据集DS1和DS2以获得DS3
使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。 以下是输入: ~10个不同大小的数据集,大部分是巨大的( 经过一番分析,我发现作业失败和缓慢的原因是歪斜键:当左侧有数百万条记录时,用连接键。 我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。 如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。
我需要根据一些共享的键列将许多数据帧连接在一起。对于键值RDD,可以指定一个分区程序,以便具有相同键的数据点被洗牌到相同的执行器,因此连接更有效(如果在之前有与洗牌相关的操作)。可以在火花数据帧或数据集上做同样的事情吗?
null null 为什么要使用UDF/UADF而不是map(假设map保留在数据集表示中)?
问题内容: 当我仔细观察时,我唯一提出的疑问是: 找不到适用于实际参数“ org.apache.spark.unsafe.types.UTF8String”的适用构造函数/方法;候选者为:“ public void sparkSQL.Tweet.setId(long)” 问题答案: 正如@ user9718686所写,id字段具有不同的类型:在json文件和类定义中。当您将其读入时,Spark会从
我在RDBMS中有几个数据库表,在当前的逻辑中,所有这些表都被连接起来并给出一些数据,基本上SQL被存储为视图的一部分。使用sqoop将数据推送到HDFS中,需要应用一些分组和按操作排序。 什么可能是连接数据集的最佳方式,如转储所需的列到内存(如df.registeredTempTable())和应用连接,或者我可以使用数据集连接,因为数据在HDFS的不同文件中可用。 问候阿南