当前位置: 首页 > 知识库问答 >
问题:

在Spark中选择独特的Cassandra

从景曜
2023-03-14

我需要一个查询来列出 Spark 中唯一的复合分区键。
CASSANDRA: SELECT DISTINCT key1, key2, key3 FROM schema.table; 中的查询非常快,但是将相同类型的数据过滤器放在 RDD 或 Spark 中.sql相比之下检索结果的速度非常慢。

例如

---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")

t1.count // takes 20 minutes
t2.count // takes 20 minutes

---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table; 

其中表格格式如下:

CREATE TABLE schema.table (
    key1 text,
    key2 text,
    key3 text,
    ckey1 text,
    ckey2 text,
    v1 int,
    PRIMARY KEY ((key1, key2, key3), ckey1, ckey2)
);

Spark不是在其查询中使用cassandra优化吗?
我如何有效地检索此信息?

共有3个答案

宫修贤
2023-03-14

Distinct性能很差。这里有一个很好的答案和一些备选方案:如何根据列的子集在RDD上有效地选择不同的行

您可以利用toDebugString来了解您的代码打乱了多少数据。

陈淳
2023-03-14

只要我们选择分区键,我们就可以使用CassandraRDD的. per分区限制函数:

val partition_keys = sc.cassandraTable("schema","table").select("key1", "key2", "key3").perPartitionLimit(1)

这是因为,根据SPARKC-436

从每个分区限制 1 的some_table中选择键

给出的结果与

从some_table中选择不同的键

此功能在 spark-cassandra-connector 2.0.0-RC1 中引入,至少需要 C* 3.6

司寇光华
2023-03-14

spark在其查询中不使用cassandra优化吗?

对但对于SparkSQL,只有列修剪和谓词下推。在RDD中,它是手动的。

我如何有效地检索这些信息?

因为您的请求返回得足够快,所以我会直接使用Java驱动程序来获得这个结果集。

虽然Spark SQL可以提供一些基于C*的优化,但在使用DataFrame接口时,这些优化通常仅限于谓词下推。这是因为框架只向数据源提供有限的信息。我们可以通过对您编写的查询进行解释来了解这一点。

scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
== Physical Plan ==
*HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
   +- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
      +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>

因此,您的Spark示例实际上将分为几个步骤。

    < li >扫描:从该表中读取所有数据。这意味着将C机器中的每个值序列化到Spark Executor JVM中,换句话说,需要做大量的工作。 < Li > * Hash Aggregate/Exchange/Hash Aggregate:从每个执行器获取值,在本地对它们进行哈希运算,然后在机器之间交换数据,并再次进行哈希运算以确保唯一性。通俗地说,这意味着创建大型散列结构,序列化它们,运行复杂的分布式sortmerge,然后再次运行散列。(昂贵)

为什么这些都没有被推到C*?这是因为Datasource(在本例中为CassandraSourceRelation)没有提供关于查询的Distinct部分的信息。这只是Spark当前工作方式的一部分。关于什么是可推送的文档

在RDDS,我们给Spark一套直接的指令。这意味着如果你想把某样东西推下去,必须手动指定。让我们看看RDD请求的调试输出

scala> sc.cassandraTable("test","tab").distinct.toDebugString
res2: String =
(13) MapPartitionsRDD[7] at distinct at <console>:45 []
 |   ShuffledRDD[6] at distinct at <console>:45 []
 +-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
    |   CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []

这里的问题是,您的“不同”调用是RDD上的通用操作,而不是特定于Cassandra的。由于RDD要求所有优化都是明确的(您键入的就是您得到的),Cassandra从未听说过对“不同”的需求,我们得到了一个与SparkSQL版本几乎相同的计划。做一次全面扫描,将所有数据从Cassandra序列化到Spark。做一次洗牌,然后返回结果。

使用SparkSQL,这几乎是我们在不向Catalyst(SparkSQL/Dataframes优化器)添加新规则的情况下所能得到的最好结果,以让它知道Cassandra可以在服务端处理一些不同的调用。然后需要为CassandraRDD子类实现它。

对于RDD,我们需要添加一个类似于已经存在的的函数,其中select Distinct调用,尽管它仅在特定情况下才允许。这是一个目前不存在于SCC中的函数,但可以相对容易地添加,因为它所做的只是将 DISTINCT添加到请求中,并可能添加一些检查以确保它是一个有意义的 DISTINCT

因为我们知道确切的CQL请求,所以我们总是可以直接使用Cassandra驱动程序来获取这些信息。Spark Cassandra连接器提供了我们可以使用的驱动程序池,或者我们可以直接使用Java驱动程序。要使用这个游泳池,我们可以这样做

import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo{ session => 
  session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
}

如果进一步的Spark工作需要,然后并行化结果。如果我们真的想分发这个,很可能有必要将该功能添加到Spark Cassandra连接器中,如上所述。

 类似资料:
  • 基于SparkML的特征选择(Feature Selectors)三个算法(VectorSlicer、RFormula以及ChiSqSelector)结合Demo进行一下理解 VectorSlicer算法介绍: VectorSlicer是一个转换器输入特征向量,输出原始特征向量子集。VectorSlicer接收带有特定索引的向量列,通过对这些索引的值进行筛选得到新的向量集。可接受如下两种索引: 1

  • 我正在使用OpenLayers并加载两个GEOJson文件,这些文件工作得非常好。现在,我希望能够选择GEOJson文件的各个组件。例如,如果在激活悬停选择时有多个特征(两个多边形),则不会发生任何情况。 我在这个示例中添加了悬停选择器http://openlayers.org/dev/examples/geojson.html 我实际上可以独立拖动元素,但我不能选择它们。我想是因为我使用的投影。

  • 我是一个新生的QA,我在这里尝试编写一个Protractor脚本,从下拉列表中选择一个选项。我的下拉列表中有两个选项,我正在尝试从数字中选择它。 这是我正在使用的代码。 默认情况下,选项 2 在页面加载时处于选中状态。我需要的是从下拉列表中选择选项1。但是,我的代码不会这样做。 下面是选择选项的代码片段。 提前感谢:)

  • 问题内容: 我一直在寻找答案,但是找不到完全根据条件获取此不同记录集的方法。我有一个包含以下示例数据的表: 我想创建一个查询,以显示唯一位置数量大于1的每种不同Type + Color的唯一位置的计数,例如 请注意,{Apple,Red,1}不会出现,因为红苹果(Chicago)只有1个位置。我想我已经有了这个(但是也许有一个更简单的方法)。我正在使用: 当唯一查询的唯一位置计数大于1时,如何创建

  • 我将html写成 现在我希望如果选项是1,那么它应该被选择为 请帮帮我!!!

  • 我有一个名为“survey_product”的表,其结构如下: 此表存储通过系统订购的产品。 下面是该表中的一些数据记录: 以上我们有3个订单(订单号2、3和4)。 我需要获取product\u id=21但product\u id=20的所有订单的order\u id(订单中的其他product\u id不相关-我只对21和20感兴趣)。 然后我需要知道product\u id为20和21的订单