当前位置: 首页 > 知识库问答 >
问题:

将包含n个scala类对象的火花RDD插入cassandra db

卜盛
2023-03-14

假设我有一个scala类的五个对象,我需要用五个对象构建一个spark RDD,并将该RDD推到cassandra表中,我的cassandr表“person”有三个字段(pId、pName、pAge)和

val object 1= new myclass(1,"abc",24)
val object2 = new myclass(2,"pqr",23)
val object3 = new myclass(3,"xyz",26)

我如何形成这三个对象的rdd?下面的行可能吗?

val collection=context.parallelize(Seq(object1,object2,object3))

如果可以制作RDD..如何将该RDD推送到Cassandra表以在该表“person”中插入三行

共有1个答案

简俊楚
2023-03-14

最简单的方法是创建一个 CaseClass,其中类与表中的行匹配

case class PersonRow(pID: int, pName: String, pAge: Int)
context.parallelize(Seq(
  PersonRow(1, "abc", 24),
  PersonRow(2, "pqr", 23),
  PersonRow(3, "xyz", 26)
)).saveToCassandra("ks","person")

有关更多信息,请参阅Spark Cassandra连接器文档

< code>mapToRow在Scala代码中是不必要的,因为它基本上是Scala中缺乏隐含性的一种变通方法。SaveToCassandra通常使用一个隐式的< code>RowWriterFactory,Scala可以通过查看RDD类类型来为您实现这一点。在Java中,工厂必须手动创建。

scala> class SomeRandomClass (val k:Int, val v:Int) {
     | def fun() = {println("lots of fun")}
     | val somethingElse:Int = 5
     | }
defined class SomeRandomClass

scala> sc.parallelize(1 to 10).map( x => new SomeRandomClass(x,x)).saveToCassandra("test","test")

scala> sc.cassandraTable("test","test")
res4: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[7] at RDD at CassandraRDD.scala:15

scala> sc.cassandraTable("test","test").collect
res5: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 5, v: 5}, CassandraRow{k: 10, v: 10}, CassandraRow{k: 1, v: 1}, CassandraRow{k: 8, v: 8}, CassandraRow{k: 2, v: 2}, CassandraRow{k: 4, v: 4}, CassandraRow{k: 7, v: 7}, CassandraRow{k: 6, v: 6}, CassandraRow{k: 9, v: 9}, CassandraRow{k: 3, v: 3})

注意,只有在类的字段(k和v)与表中的列“k和v”之间存在映射时,这种方法才有效。

 类似资料:
  • 问题内容: 在我的猪代码中,我这样做: 我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行: 是否有联合运算符可以让我一次对多个rdds进行操作: 例如 这是一个方便的问题。 问题答案: 如果这些是RDD,则可以使用方法: 没有等效项,但这只是一个简单的问题: 如果要在RDD上大量使用和重新创建,可能是避免与准备执行计划的成本相关的问题的更好选择:

  • 我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不

  • 我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?

  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 问题内容: 如何Foo从Java访问包对象中包含的对象? 问题答案: 也许从Scala 2.8.1开始,这已经发生了变化,但是该提议不起作用。您必须使用。 对于对象,方法等,它有所不同。考虑到scala类: 您可以访问foo,bar而baz在Java中,如: 当我试图弄清楚这一点时,我以为我们遇到了麻烦,因为Scala生成了一个名为的类,当然您不能在Java中导入该类。幸运的是,我们只需要pack

  • 嗨,我正在尝试生成Salt示例的输出,但没有使用文档中提到的docker。我找到了帮助生成输出的scala代码,这是main.scala。我将main.scala修改为一个方便的main.scala, 我为这个scala创建了一个单独的文件夹, calac-cp“lib/salt.jar:lib/spark.jar”main.scala 这已成功运行并在文件夹BinexTest下生成类。 现在,项