使用Spark SQL,我有两个数据帧,它们是从一个数据帧创建的,例如:
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
我想过滤df1,如果它的“路径”的一部分是df2中的任何路径。所以如果df1有路径为“a/b/c/d/e”的行,我会找出df2中是否有路径为“a/b/c”的行。SQL应该是这样
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
其中udf是用户定义的函数,用于缩短df1的原始路径。简单的解决方案是使用JOIN,然后过滤结果,但这很慢,因为df1和df2都有超过10mil的行。
我也尝试了下面的代码,但是首先我必须从df2创建广播变量
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
我遇到的问题是,当评估返回/执行df2过滤时,Spark卡住了。
我想知道如何使用两个数据帧来实现这一点。我真的不想加入。有什么想法吗?
编辑
在我的原始代码中,df1有别名“first”和df2“第二”。这个连接不是笛卡尔式的,它也不使用广播。
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix是udf
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
shortenPath-它剪切路径中的最后两个字符
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
记录示例。路径是唯一的。
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
所以df1代表
a/a/a/b/c abc
a/b/c/d/e abc
...
df2包括
a/a/a qwe
a/b/c qwe
...
作为使用子查询在中实现<code>的一种可能方式,可以使用<code>左半连接:
JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp");
SQLContext sqlContext = new SQLContext(javaSparkContext);
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("path", DataTypes.StringType, false),
DataTypes.createStructField("value", DataTypes.StringType, false)
});
// Prepare First DataFrame
List<Row> dataForFirstDF = new ArrayList<>();
dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc"));
dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
//
df1.show();
//
// +---------+-----+
// | path|value|
// +---------+-----+
// |a/a/a/b/c| abc|
// |a/b/c/d/e| abc|
// | x/y/z| xyz|
// +---------+-----+
// Prepare Second DataFrame
List<Row> dataForSecondDF = new ArrayList<>();
dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);
// Use left semi join to filter out df1 based on path in df2
Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path"));
DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");
//
result.show();
//
// +---------+-----+
// | path|value|
// +---------+-----+
// |a/a/a/b/c| abc|
// |a/b/c/d/e| abc|
// +---------+-----+
此类查询的物理计划如下所示:
== Physical Plan ==
Limit 21
ConvertToSafe
LeftSemiJoinBNL Some(Contains(path#0, path#2))
ConvertToUnsafe
Scan PhysicalRDD[path#0,value#1]
TungstenProject [path#2]
Scan PhysicalRDD[path#2,value#3]
它将使用左半连接BNL进行实际连接操作,该操作应在内部广播值。从更多细节参考火花中的实际实现 - 左半加入BNL.scala
附言:我不太明白删除最后两个字符的必要性,但如果需要 - 可以做到,就像@zero323建议的那样(使用regexp_extract
)。
你的代码至少存在几个问题:
的数据帧
根本无法正常工作,您应该得到一个例外。联接
作为笛卡尔积执行,后跟过滤器。由于 Spark 使用哈希进行
连接,因此只有在没有笛卡尔的情况下,才能有效地执行基于相等的连接。它与为什么在SQL查询中使用UDF会导致笛卡尔积略有关系?数据帧
都相对较大并且具有相似的大小,则广播不太可能有用。查看为什么我的广播哈希连接比 Spark 中的随机哈希链接慢前缀
似乎是错误的。特别是它看起来可以匹配前缀和后缀“第一路径”)
条件看起来不对。我假设您希望 df1
中的 a/a/a/b/c
与 df2
中的 a/a/a
匹配。如果是这样,它应该是gt
而不是lt
。也许你能做的最好的事情是类似这样的事情:
import org.apache.spark.sql.functions.{col, regexp_extract}
val df = sc.parallelize(Seq(
("a/a/a/b/c", "abc"), ("a/a/a","qwe"),
("a/b/c/d/e", "abc"), ("a/b/c", "qwe"),
("a/b/b/k", "foo"), ("a/b/f/a", "bar")
)).toDF("path", "value")
val df1 = df
.where(col("value") === "abc")
.withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1))
.as("df1")
val df2 = df.where(col("value") === "qwe").as("df2")
val joined = df1.join(df2, col("df1.path_short") === col("df2.path"))
你可以试着像这样广播其中一个表(Spark
import org.apache.spark.sql.functions.broadcast
df1.join(broadcast(df2), col("df1.path_short") === col("df2.path"))
并增加自动广播限制,但正如我在上面提到的,它很可能不如普通的HashJoin
有效。
我正在尝试基于第二个数据框的值周围的范围创建一个数据框的子集,我一直在进行研究,但我就是想不出如何去做。我在这里使用了虚拟数据,因为它们都是包含许多列的大型数据集。 数据帧1(df1)有50列,数千条不同纬度的记录 数据帧2(df2)有数百个城镇,都位于不同纬度,比df1小得多 我需要df1的一个子集,它只包括纬度在df2纬度0.01范围内的行。所以代码需要查看df1的每一行,并根据df2的每一行
我有两个数据帧df1和df2。df1就像一个具有以下值的字典 df2具有以下值: 我想基于df1数据帧中的,将df2拆分为3个新的数据帧。 日期,TLRA_权益栏应位于数据框 预期产出: > 数据帧 消费者,非周期性数据帧 请让我知道如何有效地做。我想做的是连接列名,例如,然后根据列名的前半部分分割数据帧。 代码: 但这很复杂。需要更好的解决方案。
我有以下数据框: 我想根据以下条件对其进行过滤: 创建的角度=范围(87-92) GDT 1和GDT 2之间的距离 到目前为止我尝试了这个(最后一个方法): 此代码的输出是一个没有错误的空数据库。
我有两个数据帧df1和df2 df1如下 df2就像 我想根据df2中与df1中的列名匹配的单元格值将值从df1复制到df2,所以我的df3应该看起来像 df3 基本上,我想根据df2的单元格值(df1中的列名)从df1复制df2中的列 如果它仍然令人困惑,请告诉我
考虑以下数据帧: 使用dplyr,我如何在每一列(不隐式命名)上筛选所有大于2的值。 模拟假设的
我有一个这样的df: 对于每个ID,我有5列A1到A5(实际上我有更多),并且这些值是特定ID的最高优先级。 例如:ID 1将A1、A3和A5作为优先级,ID 3只有2个A2和A1,ID 5没有优先级 合成DF 我尝试使用和使用这个和这个以及更多的方法来实现相同的功能,但无法获得相同的结果df。 这方面的任何帮助或我这边的清晰度!!