我有两个大的Hive表,我想用spark.sql将它们连接起来。表格采用snappy格式,在Hive中存储为拼花文件。
我想加入它们并对某些列进行一些聚合,假设计算所有行和一列的平均值(例如 doubleColumn),同时使用两个条件进行过滤(假设在 col1,col2 上)。
注意:我在一台机器上进行测试安装(虽然功能非常强大)。我希望集群中的性能可能会有所不同。
我的第一个尝试是使用spark-sql,如:
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
不幸的是,即使我给每个执行器和驱动程序至少8 gb的内存,它也只能运行5分钟。我还尝试使用dataframe语法,并尝试先过滤行,然后只选择特定的列,以获得更好的选择性,例如:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
但是这没有显著的性能提升。我如何提高连接中的性能?
>
spark.sql或数据帧语法的最佳方法是什么?
给更多的执行者或记忆会有帮助吗?
我应该使用缓存吗
我缓存了两个数据帧tab1、tab2和联接聚合,但我认为缓存我的数据帧并不实用,因为我们对并发感兴趣,许多用户同时请求一些分析查询。
是否无事可做,因为我在单节点上工作,当我转到群集上的生产环境时,我的问题会消失?
额外的问题:我用Impala尝试了这个查询,它做了大约40秒,但比spark.sql要好得多。Impala怎么比spark更好?!
您可以更改配置,无论如何都必须在大型集群上更改它们。我马上就能想到两件事。将 spark.executor.cores
设置为 5,并且根据内存,使用 spark.executor.instances 和 spark.executor.memory
提供更多的执行器和更多内存。您还可以按某些列对配置单元表进行存储桶和排序吗?如果您对表进行桶处理,那么它将消除在加入表之前对表进行排序的需要。
如果在连接之后缓存数据帧,速度可能会更快,这取决于catalyst处理聚合查询的方式。您也可以在查询结束后< code>unpersist(),但是我同意GC可能不值得这么做。
使用SQL或scala dsl不会有任何好处。两者都使用全阶段html" target="_blank">代码生成,所以它们本质上是一样的。
Impala总是更快的一个原因是,它从不担心复制,尽管有一个节点不应该那么麻烦,但在预加载数据以进行复制和不需要复制之间可能没有一个完美的火花分离。
哪个是最好的方法来做到这一点spark.sql或数据帧语法?
没有任何区别。
给更多的执行者或记忆会有帮助吗?
仅当问题不是由数据倾斜引起的,并且您正确调整配置时。
我应该使用缓存吗?
如果输入数据被多次重用,那么从性能角度来看,这可能是明智的(正如您已经确定的那样)。
是否无事可做,因为我在单节点上工作,当我转到群集上的生产环境时,我的问题会消失?
通常,在单个节点上进行性能测试是完全无用的。它忽略了瓶颈(网络 IO/通信)和优势(摊销磁盘 I/O 和资源使用情况)。
但是,您可以显著减少parallels sm(< code > spark . SQL . shuffle . partitions ,< code > SQL . default . parallelism 并增加输入拆分大小)。反直觉的Spark风格的并行性是为分配负载而设计的,它在单台机器上更像是一种负担,而不是资产。这取决于洗牌(磁盘写入!)对于通信来说,与共享内存相比,速度非常慢,而且调度开销也很大。
Impala怎么比spark更好?!
因为它是专门为低延迟并发查询而设计的。这不是Spark的目标(数据库与ETL框架)。
当你
由于我们对并发感兴趣,许多用户同时询问一些分析查询。
Spark听起来不像是一个正确的选择。
我们正在尝试在纱线上运行我们的火花集群。我们有一些性能问题,尤其是与独立模式相比。 我们有一个由5个节点组成的集群,每个节点都有16GB的RAM和8个核心。我们已将纱线站点中的最小容器大小配置为3GB,最大为14GB。xml。向纱线集群提交作业时,我们提供的执行器数量=10,执行器内存=14 GB。根据我的理解,我们的工作应该分配4个14GB的容器。但spark UI仅显示3个容器,每个容器的容量
我有一些Spark经验,但刚开始使用Cassandra。我正在尝试进行非常简单的阅读,但性能非常差——不知道为什么。这是我正在使用的代码: 所有3个参数都是表上键的一部分: 主键(group\u id,epoch,group\u name,auto\u generated\u uuid\u field),聚类顺序为(epoch ASC,group\u name ASC,auto\u generat
我要加入两个rdd。 示例文件1数据: 示例文件2数据: 下面是代码: o/p是k,(v),我想在做进一步处理时去掉值两边的括号。我尝试了一些事情,包括 我还保存了结果: 不幸的是,结果总是以下格式: 我希望他们:
我正在研究建立一个JDBC Spark连接,以便从r/Python使用。我知道和都是可用的,但它们似乎更适合交互式分析,特别是因为它们为用户保留了集群资源。我在考虑一些更类似于Tableau ODBC Spark connection的东西--一些更轻量级的东西(据我所知),用于支持简单的随机访问。虽然这似乎是可能的,而且有一些文档,但(对我来说)JDBC驱动程序的需求是什么并不清楚。 既然Hiv
我现在有一个spark工作,它从HDFS中提取数据,并将数据转换为平面文件,以加载到Cassandra中。
我在Scala/Spark中有一个批处理作业,它根据一些输入动态创建Drools规则,然后评估规则。我还有一个与要插入到规则引擎的事实相对应的输入。 到目前为止,我正在一个接一个地插入事实,然后触发关于这个事实的所有规则。我正在使用执行此操作。 seqOp运算符的定义如下: 以下是生成的规则的示例: 对于同一RDD,该批次花了20分钟来评估3K规则,但花了10小时来评估10K规则! 我想知道根据事