当前位置: 首页 > 知识库问答 >
问题:

Spark RDD 加入 Cassandra Table

唐法
2023-03-14

我加入了SparkRDDCassandra表(查找),但我不能理解一些东西。

  1. 将火花从Cassandra表中拉取range_start和range_end之间的所有记录,然后在火花内存中与RDD连接,或者将所有值从RDD下推到Cassandra并在那里执行连接
  2. 限制(1)将应用在哪里?(CassandraSpark
  3. 无论应用什么限制(1或1000),Spark总是从Cassandra中提取相同数量的记录吗?

以下代码:

//creating dataframe with fields required for join with cassandra table
//and converting same to rdd
val df_for_join = src_df.select(src_df("col1"),src_df("col2"))
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("my_keyspace", "my_table"
,selectedColumns = SomeColumns("col1","col2","col3","col4")
,SomeColumns("col1", "col2")
).where("created_at >''range_start'' and created_at<= range_end")
.clusteringOrder(Ascending).limit(1)

Cassandra表格详细信息-

PRIMARY KEY ((col1, col2), created_at) WITH CLUSTERING ORDER BY (created_at ASC)

共有1个答案

巢皓君
2023-03-14

joinWithCassandra 表从传递的 RDD 中提取分区/主键值,并将它们转换为针对 Cassandra 中分区的单个请求。然后,最重要的是,SCC 可能会应用额外的过滤,例如,您在哪里条件。如果我没记错的话,但我可能是错的,限制不会完全推到 Cassandra - 它仍然可能为每个分区获取限制行。

您始终可以通过执行 result_rdd.toDebugString 来检查连接发生的位置。对于我的代码:

val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)

它给出了以下内容:

scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
 |  MapPartitionsRDD[2] at rdd at <console>:45 []
 |  MapPartitionsRDD[1] at rdd at <console>:45 []
 |  ParallelCollectionRDD[0] at rdd at <console>:45 []

而如果您进行“正常”联接,您将获得以下内容:

scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19

scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
 |  MapPartitionsRDD[36] at join at <console>:49 []
 |  CoGroupedRDD[35] at join at <console>:49 []
 +-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
 +-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []

更多信息可在SCC文件的相应章节中找到。

 类似资料:
  • 把下列的jar放入WebContent/WEB-INF/lib下 quartz-2.2.1.jar nutz-integration-quartz-1.r.60.r2.jar 在conf目录下,新建一个文件叫quartz.properties,内容如下 org.quartz.scheduler.instanceName = NutzbookScheduler org.quartz.threadPo

  • 首先,下面罗列的jar 放入 WebContent/WEB-INF/lib中 shiro-all-1.3.2.jar slf4j-api-1.7.12.jar slf4j-log4j12-1.7.12.jar commons-beanutils-1.9.2.jar commons-logging-1.2.jar nutz-integration-shiro-1.r.60.r2.jar 在conf目

  • 问题内容: 我刚进入hibernate状态,遇到了以下问题:我收到了 “希望加入的路径!” 当我尝试运行此查询时出现异常: 我想选择给定航班已售出机票的平均价格。 我的代码是: Flight.hbm.xml Ticket.hbm.xml 所有其他没有JOIN的查询都可以正常工作。我不知道问题出在哪里。 正确的查询是: 并与查询执行一起: 问题答案: 如您所链接的问题和Hibernate文档中所述,

  • 问题内容: 我以为我知道该如何使用,但显然不知道如何使用。谁能帮我? 这给我例外 与拥有OneToMany关系。 我的第二个问题是,说此查询返回唯一的结果,那么如果我这样做 代表上面的查询名称。那么,将其串联在一起还是会得到回报? 问题答案: JPQL中的一对多关系如下所示: 在子句中指定多个属性时,结果返回为: 顺便说一下,为什么您的实体以复数形式命名,这令人困惑。如果要使用复数形式的表名,则可

  • 我有一个hibernate映射问题。我有以下两个DB表(不允许我更改DB): 我试图为这些DB表创建实体,但不知道如何映射表之间的连接。以下是我的尝试(但它是错误的): 可嵌入类 嵌入使用 困难在于我想在一个列和embeddedId列的一部分之间建立一个单一的连接。对这个问题有什么想法吗?(我正在使用Hibernate4.0.1)

  • 仅限于网易内部支持并不是 Megalo 的初衷,我们致力于成为开源社区内「比较好用」的小程序解决方案。 要实现这个目标并不是易事,离不开社区的力量,我们诚邀诸位有追求有想法的同道中人共同参与 Megalo 的孵化、布道。

  • 本章介绍如何在GitHub上注册账号,并以现有项目为例介绍GitHub的主要功能。 2.1. 创建GitHub账号 2.2. 浏览托管项目 2.3. 社交网络

  • 我刚开始Hibernate,遇到了以下问题:我得到了“加入的路径!” 尝试运行此查询时出现异常: 我想选择给定航班已售出的机票的平均价格。 我已经检查了这些链接,但我没有解决我的问题:HQL左连接:连接预期的路径hql内部连接预期的路径!错误 我的代码是: 航班.hbm.xml 票据.hbm.xml 没有JOIN的所有其他查询都可以正常工作。我不知道问题出在哪里。 正确的问题是: 连同查询执行: