我用这些参数启动火花2.3.1的火花外壳:
--master='local[*]
--执行器内存=6400M
--驾驶员记忆=60G
——conf spark。sql。autoBroadcastJoinThreshold=209715200
——conf spark。sql。洗牌分区=1000
——conf spark。地方的dir=/data/spark temp
——conf spark。驾驶员extraJavaOptions='-Dderby。系统主页=/data/spark catalog/'
然后创建两个带有排序和存储桶的蜂箱表
第一个表名-表1
第二个表名-table2
val storagePath = "path_to_orc"
val storage = spark.read.orc(storagePath)
val tableName = "table1"
sql(s"DROP TABLE IF EXISTS $tableName")
storage.select($"group", $"id").write.bucketBy(bucketsCount, "id").sortBy("id").saveAsTable(tableName)
(表2的代码相同)
我希望当我用另一个df连接这些表时,查询计划中没有不必要的交换步骤
然后我关闭广播使用SortMergeJoin
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
我拿一些df
val sample = spark.read.option("header", "true).option("delimiter", "\t").csv("path_to_tsv")
val m = spark.table("table1")
sample.select($"col" as "id").join(m, Seq("id")).explain()
== Physical Plan ==
*(4) Project [id#24, group#0]
+- *(4) SortMergeJoin [id#24], [id#1], Inner
:- *(2) Sort [id#24 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#24, 1000)
: +- *(1) Project [col#21 AS id#24]
: +- *(1) Filter isnotnull(col#21)
: +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
+- *(3) Project [group#0, id#1]
+- *(3) Filter isnotnull(id#1)
+- *(3) FileScan parquet default.table1[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
但当我在连接前对两个表使用union时
val m2 = spark.table("table2")
val mUnion = m union m2
sample.select($"col" as "id").join(mUnion, Seq("id")).explain()
== Physical Plan ==
*(6) Project [id#33, group#0]
+- *(6) SortMergeJoin [id#33], [id#1], Inner
:- *(2) Sort [id#33 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#33, 1000)
: +- *(1) Project [col#21 AS id#33]
: +- *(1) Filter isnotnull(col#21)
: +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
+- *(5) Sort [id#1 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1, 1000)
+- Union
:- *(3) Project [group#0, id#1]
: +- *(3) Filter isnotnull(id#1)
: +- *(3) FileScan parquet default.membership_g043_append[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
+- *(4) Project [group#4, id#5]
+- *(4) Filter isnotnull(id#5)
+- *(4) FileScan parquet default.membership_g042[group#4,id#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
在这种情况下出现了排序和分区(步骤5)
如何在不进行排序和交换的情况下合并两个蜂窝表
据我所知,Spark在连接时不考虑排序,而只考虑分区。所以为了获得高效的连接,您必须按同一列进行分区。这是因为排序并不能保证具有相同键的记录最终会在同一个分区中。Spark必须确保具有相同值的所有键都从多个数据框中被洗牌到同一个分区和同一个执行程序上。
我正在回顾一个旧的Spark软件,它必须并行运行许多小的查询和计数()并使用直接的hive-sql。 在过去,该软件通过在shell()上直线运行每个查询来解决“并行化查询的问题”。我不能用现代新鲜的Spark,此刻只有Spark V2.2。下面的片段说明了完整的SQL查询方法。 有一种“Spark方式”可以访问Hive并运行SQL查询,性能(略)更好,而且Spark配置的重用性更好? 没有丢失纯
如果我想重新分区一个数据帧,如何决定需要做的分区数量?如何决定是使用重新分区还是合并?我知道合并基本上只是为了减少分区的数量。但是我们如何决定在什么情况下使用哪个呢?
蜂巢平台(OpenComb Platform)是一个基于 PHP 5.3 实现的深度云计算应用框架。蜂巢采用了扩展模式,系统中的功能和特性,都由扩展提供。 因此,用户可以通过开发和安装扩展来部署各种类型的互联网应用。
ApplicationMaster:用户类引发异常:org。阿帕奇。火花sql。AnalysisException:未找到表或视图:“DB\U X”。“表Y” Spark会话: 配置单元站点中的配置单元仓库目录。xml:/apps/hive/warehouse/ 此处抛出错误: 在java中: 在spark shell interactive中: 显示创建表table\u Y: Hadoop文件:
问题内容: 在我的猪代码中,我这样做: 我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行: 是否有联合运算符可以让我一次对多个rdds进行操作: 例如 这是一个方便的问题。 问题答案: 如果这些是RDD,则可以使用方法: 没有等效项,但这只是一个简单的问题: 如果要在RDD上大量使用和重新创建,可能是避免与准备执行计划的成本相关的问题的更好选择:
我正在执行一个HQL查询,该查询几乎没有连接、联合和插入覆盖操作,如果只运行一次,它就可以正常工作。 如果我第二次执行相同的作业,我就会面临这个问题。有人能帮我确定在哪种情况下我们会得到这个异常吗?