我有两个包含两列的DataFrames
>
<code>df1</code>,带有架构<code>(键1:长,值)</code>
<code>df2</code>,带架构<code>(键2:数组[Long],值)</code>
我需要在键列上连接这些DataFrames(查找key1
和key2
中的值之间的匹配值)。但问题是它们的类型不同。有办法做到这一点吗?
您可以转换key 1和key 2的类型,然后使用包含函数,如下所示。
val df1 = sc.parallelize(Seq((1L,"one.df1"),
(2L,"two.df1"),
(3L,"three.df1"))).toDF("key1","Value")
DF1:
+----+---------+
|key1|Value |
+----+---------+
|1 |one.df1 |
|2 |two.df1 |
|3 |three.df1|
+----+---------+
val df2 = sc.parallelize(Seq((Array(1L,1L),"one.df2"),
(Array(2L,2L),"two.df2"),
(Array(3L,3L),"three.df2"))).toDF("key2","Value")
DF2:
+------+---------+
|key2 |Value |
+------+---------+
|[1, 1]|one.df2 |
|[2, 2]|two.df2 |
|[3, 3]|three.df2|
+------+---------+
val joinedRDD = df1.join(df2, col("key2").cast("string").contains(col("key1").cast("string")))
JOIN:
+----+---------+------+---------+
|key1|Value |key2 |Value |
+----+---------+------+---------+
|1 |one.df1 |[1, 1]|one.df2 |
|2 |two.df1 |[2, 2]|two.df2 |
|3 |three.df1|[3, 3]|three.df2|
+----+---------+------+---------+
做到这一点的最佳方法(也是不需要对数据帧进行任何转换或分解的方法)是使用如下所示的< code > array _ contains spark SQL表达式。
import org.apache.spark.sql.functions.expr
import spark.implicits._
val df1 = Seq((1L,"one.df1"), (2L,"two.df1"),(3L,"three.df1")).toDF("key1","Value")
val df2 = Seq((Array(1L,1L),"one.df2"), (Array(2L,2L),"two.df2"), (Array(3L,3L),"three.df2")).toDF("key2","Value")
val joinedRDD = df1.join(df2, expr("array_contains(key2, key1)")).show
+----+---------+------+---------+
|key1| Value| key2| Value|
+----+---------+------+---------+
| 1| one.df1|[1, 1]| one.df2|
| 2| two.df1|[2, 2]| two.df2|
| 3|three.df1|[3, 3]|three.df2|
+----+---------+------+---------+
请注意,您不能直接使用org.apache.spark.sql.functions.array_contains
函数,因为它要求第二个参数是文字,而不是列表达式。
并将其应用于数据表的一列--这是我希望这样做的: 我还没有找到任何简单的方法,正在努力找出如何做到这一点。一定有一个更简单的方法,比将数据rame转换为和RDD,然后从RDD中选择行来获得正确的字段,并将函数映射到所有的值,是吗?创建一个SQL表,然后用一个sparkSQL UDF来完成这个任务,这更简洁吗?
我想过滤掉具有“c2”列前3个字符的记录,无论是“MSL”还是“HCP”。 所以输出应该如下所示。 有谁能帮忙吗? 我知道df。过滤器($c2.rlike(“MSL”))--用于选择记录,但如何排除记录? 版本:Spark 1.6.2 Scala:2.10
我想在spark中读取一个CSV,将其转换为DataFrame,并使用将其存储在HDFS中 在Apache Spark中将CSV文件加载为DataFrame的正确命令是什么?
我正在尝试使用Databricks的spark-csv2.10依赖关系将一个数据帧写入到HDFS的*.csv文件。依赖关系似乎可以正常工作,因为我可以将.csv文件读入数据帧。但是当我执行写操作时,我会得到以下错误。将头写入文件后会出现异常。 当我将查询更改为时,write工作很好。 有谁能帮我一下吗? 编辑:根据Chandan的请求,这里是的结果
我有一个 功能,请告诉我是否有任何解决方法。 谢谢你。!
我正在尝试在PySpark中为两个数据框(df1和df2)创建自定义连接(类似于此),代码如下所示: 我得到的错误消息是: 有没有办法编写一个可以处理来自两个单独数据帧的列的 PySpark UDF?