当前位置: 首页 > 知识库问答 >
问题:

Spark 不推送筛选器(推送筛选器数组为空)

单修德
2023-03-14

介绍

我注意到我们项目中的推送过滤器都不起作用。它解释了为什么执行时间受到影响,因为它读取了数百万次,而它应该将它减少到几千次。为了调试这个问题,我编写了一个小测试,读取CSV文件,过滤内容(下推过滤器)并返回结果。

它不能与CSV一起工作,所以我尝试读取一个拼花文件。没有一个能用的。

数据

人。csv文件具有以下结构:

first_name,last_name,city  // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row

名词(noun的缩写)镶木地板文件有相同的结构

读取CSV文件

为了重现这个问题,我编写了一个读取csv文件的最小代码,并且应该只返回过滤后的数据。

阅读csv文件并打印物理平面图:

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);

物理计划:

---------- --------- ----
|first_name|last_name|城市|
---------- --------- ----
|第一名|LastName1|伯尔尼|
---------- --------- ----

== 解析的逻辑计划 == 关系[first_name#10,last_name#11,城市#12] csv

==分析的逻辑计划==first_name:字符串,last_name:字符串,city:字符串关系[first_name#10,last_name#11,city#12]csv

==优化的逻辑计划= = Relation[名字#10,姓氏#11,城市#12] csv

== 物理计划 == *(1) 文件扫描 csv [first_name#10,last_name#11,城市#12] 批处理:假,格式:CSV,位置:内存文件索引[文件:人.csv],分区筛选器:[],推送筛选器:[],读取模式:结构

我测试了镶木地板文件,结果是不幸的是相同的。

我们可以注意到的是:

  • PushedFilters为空,我希望过滤器包含谓词。
  • 返回的结果仍然正确。

我的问题是:为什么这个PushedFilters是空的?

N、 B:

  • Spark版本:2.4.3
  • 文件系统:ext4(和集群上的HDFS,两者都不起作用)

共有2个答案

孟胤
2023-03-14

只是为了文档,这是解决方案(感谢蜥蜴王):

结果

  • 之前: 推送筛选器: []
  • after: PushedFilters: [IsNotNull(city), EqualTo(city,Bern)]

密码

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
Dataset<Row> dsFiltered = ds.where(col("city").equalTo("Bern"));
dsFiltered.explain(true);

物理计划

实体计划看起来好得多:

== Parsed Logical Plan ==
'Filter ('city = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv

== Analyzed Logical Plan ==
first_name: string, last_name: string, city: string
Filter (city#12 = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv

== Optimized Logical Plan ==
Filter (isnotnull(city#12) && (city#12 = Bern))
+- Relation[first_name#10,last_name#11,city#12] csv

== Physical Plan ==
*(1) Project [first_name#10, last_name#11, city#12]
+- *(1) Filter (isnotnull(city#12) && (city#12 = Bern))
   +- *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:./people.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,Bern)], ReadSchema: struct<first_name:string,last_name:string,city:string>
戚飞雨
2023-03-14

您正在对第一个数据集调用 explain ,即仅具有读取功能的数据集。尝试类似的东西(对不起,我只有可用的 Scala 环境):

val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))

f.explain(true)

f.show()

此外,因此,使用类型化数据集 API 时要小心。不过不应该是你的情况。

 类似资料:
  • 我在spark 1.2.1上使用datastax/spark-cassandra-connector和充满1B+行的C*表(datastax-enterprise dse 4.7.0)。我需要对时间戳参数执行范围筛选/Where查询。 使用rdd和JoinWithCassandraTable还是使用数据帧和PushDown?还有别的事吗?

  • 此外,它在spark cassandra Connector1.4中工作,但不是与最新的cassandra Connector1.6.0-M1一起工作。请让我知道这个问题

  • 有什么建议吗?

  • 筛选器。 Usage 全部引入 import { Picker } from 'beeshell'; 按需引入 import Picker from 'beeshell/dist/components/Picker'; Examples Code import { Picker } from 'beeshell'; <Picker ref={(c) => { this._pick

  • 问题 你想要根据布尔条件来筛选数组。 解决方案 使用 Array.filter (ECMAScript 5): array = [1..10] array.filter (x) -> x > 5 # => [6,7,8,9,10] 在 EC5 之前的实现中,可以通过添加一个筛选函数扩展 Array 的原型,该函数接受一个回调并对自身进行过滤,将回调函数返回 true 的元素收集起来。 # 扩展 A

  • 问题内容: 我试图迅速过滤字典: 上面的过滤器代码在swift 2下编译,但产生以下错误: 无法将类型[[(String,String)]’的值分配给类型’[String:String]’的值 这是swift编译器中的错误,还是不是快速过滤字典的正确方法? 提前非常感谢您! 问题答案: 此问题已在Swift 4中修复 在Swift 4中,过滤后的字典返回字典。 Swift 2和3的原始答案 问题是