当前位置: 首页 > 知识库问答 >
问题:

按 RDD 值从 Cassandra 表中筛选

李意致
2023-03-14

我想根据RDD中的值从Cassandra查询一些数据。我的方法如下:

val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e ) 
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name") 
val userNames = userIds.flatMap { userId => 
  t.where("userid = ?", userId).take(1) 
} 
userNames.take(1) 

虽然Cassandra查询在Spark shell中工作,但当我在平面图中使用它时,它会引发异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException: 
        org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
        com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49) 
        com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) 
        com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94) 

我的理解是,我不能在另一个RDD内部产生一个RDD。

我在网络上找到的示例读取RDD中的整个Cassandra表并连接RDD(如下所示:https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/)。但是如果Cassandra表很大,它就不会扩展。

但是我该如何解决这个问题呢?

共有1个答案

荆钱明
2023-03-14

Spark 1.2引入joinWithCassandraTable

val userids = sc.textFile("file:///Users/russellspitzer/users.list")
userids
 .map(Tuple1(_))
 .joinWithCassandraTable("keyspace","table")

此代码最终将执行与下面的解决方案相同的工作。join

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable

我认为您实际上想要做的是在两个数据源上做一个内部连接。这实际上应该比平面图方法更快,并且有一些内部智能散列。

scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list")
scala> userids.take(5)
res19: Array[String] = Array(3, 2)

scala> sc.cassandraTable("test","users").collect
res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3, username: Jacek}, CassandraRow{userid: 1, username: Russ}, CassandraRow{userid: 2, username: Helena})

scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect
res18: Array[(Int, (Boolean, String))] = Array((2,(true,Helena)), (3,(true,Jacek)))

如果您实际上只是想对C*数据库执行一系列主键查询,那么使用普通的驱动程序路径而不是spark来执行它们可能会更好。

import com.datastax.spark.connector.cql.CassandraConnector
import collection.JavaConversions._

val cc = CassandraConnector(sc.getConf)
val select = s"SELECT * FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
      cc.withSessionDo(session =>
        session.execute(select, id.toInt: java.lang.Integer).iterator.toList.map(row =>
          (row.getInt("userid"), row.getString("username"))))).collect

 类似资料:
  • 我有一个csv,它具有以下结构:

  • 我在spark中有以下命令, 有一组单词,data有三个字符串列,取自。 现在,只要中每个单词的单词模式出现在三列数据中的任何一列中,我就希望过滤掉数据中的行(spark dataframe)。 例如,如果有诸如之类的单词,并且如果三列数据中的任何一列包含诸如、等值,我希望过滤掉该行。 我尝试了以下方法: 这只适用于一个词。但是我想检查中的所有单词并删除它。有办法做到这一点吗? 我对Pyspark

  • 有一个数据帧: 以及熊猫系列: 如何创建包含c1在list1中的行的新数据帧。 输出:

  • 我有这个json: 我想使用通过其名称查找元素“foo”(“foo”) 我尝试了类似 但它似乎不起作用(我检查了 https://jsonpath.com)

  • 我试图通过值过滤我的Json中的一个数组与Jsonpath。我想在下面的JSON中获得国家的long_name。为了做到这一点,我通过类型[0]==“国家”过滤adress_components,但它似乎不起作用。 我试过的JsonPath: 我想要的结果是:“加拿大”。 JSON: 谢谢你的帮助。

  • 我希望能够从外部文件的值中筛选excel电子表格中的列值。 例如:列A列出了在计算机上运行的一堆程序。我在一个. txt或. csv文件中有一个“认可程序”的白名单。 如果单元格包含与. csv文件中的一个条目匹配的值,我想编写一个脚本,将从“列A”中删除条目。 例如:“A列”包含数百条从“Adobe”到“Xerox”的条目。我有我的“认可软件”清单,其中包括Adobe和Xerox。我希望Exce