当前位置: 首页 > 面试题库 >

为什么SparkSession对一个动作执行两次?

仇经武
2023-03-14
问题内容

最近升级到Spark 2.0,尝试从JSON字符串创建简单的数据集时遇到一些奇怪的行为。这是一个简单的测试用例:

 SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate();
 JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

 JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
            "{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}",
            "{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}"
         ));

 JavaRDD<String> mappedRdd = rdd.map(json -> {
     System.out.println("mapping json: " + json);
     return json;
 });

 Dataset<Row> data = spark.read().json(mappedRdd);
 data.show();

并输出:

mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
+----+--------------------+--------+
|name|               roles|   title|
+----+--------------------+--------+
| tom|[designer, develo...|engineer|
|jack| [designer, manager]|     cto|
+----+--------------------+--------+

即使我仅执行一项操作,“
map”功能似乎仍被执行两次。我以为Spark会懒惰地建立一个执行计划,然后在需要时执行它,但这似乎使得为了将数据读取为JSON并对其进行任何处理,该计划必须至少执行两次。

在这种简单的情况下,这并不重要,但是当map函数长时间运行时,这将成为一个大问题。这是对的,还是我缺少什么?


问题答案:

这是因为您不提供的架构DataFrameReader。结果,Spark必须急切地扫描数据集以推断输出模式。

由于mappedRdd未缓存,因此将对其进行两次评估:

  • 一次用于模式推断
  • 一次致电 data.show

如果要阻止,则应为阅读器提供架构(Scala语法):

val schema: org.apache.spark.sql.types.StructType = ???
spark.read.schema(schema).json(mappedRdd)


 类似资料: