当前位置: 首页 > 知识库问答 >
问题:

Spark streaming不向Cassandra插入数据

古棋
2023-03-14

线程“streaming-job-executor-53”java.lang.noClassDefounderror:com/datastax/spark/connector/columnselector在com.enerbyte.spark.jobs.wattiopipeline.wattiopipelineStreamingJob$$anonFun$main$2.在com.enerbyte.spark.jobs.wattiopipelineStreamingJob.jobs.wattiopipelineStreamingJob.wattiopipelineStreamingJob.$$anonFun$main$2$anonfun$apply$mcv$sp$1.在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.在org.apache.spark.streaming.dstream.foreachdstream.scala:50(foreachdstream.scala:50)在org.apache.spark.streaming.dstream.foreachdstream.scala:50)在org.apache.spark.streaming.dstream.foreachdstream.scala:50 foreachdstream$$anonfun$1.在org.apache.spark.streaming.dstream.foreachdstream$$anonfu应用$mcv$sp(foreachdstream.scala:49)n$1.在org.apache.spark.streaming.dstream.foreachdstream.scala:49)处应用(foreachdstream.scala:49)在scala.util.try处应用(foreachdstream.scala:49)在org.apache.spark.streaming.schedule.job.run处应用(try.scala:161)在org.apache.spark.streaming.schedule.job.run处应用(job.scala:39)在.在org.apache.spark.streaming.schedule.jobscheduler.scala:224)处应用(jobscheduler.scala:224)在scala.util.dynamicvariable.With Value(dynamicvariable.scala:57)在org.apache.spark.streaming.schedule.jobschedule.scala:223)在java.util.concurrent.threadpoolexecutor.junworker(threadpoolexecutor.worker.run(:617)在java.lang.thread.run(thread.java:745)处由:java.lang.ClassNotFoundException:com.datastax.spark.conne引起ctor.columnSelector在java.net.URLClassLoader.FindClass(URLClassLoader.java:381)在java.lang.ClassLoader.LoadClass(ClassLoader.java:424)在java.lang.ClassLoader.LoadClass(ClassLoader.java:357)

我为连接器添加了依赖关系,如下所示:

“com.datastax.spark”%%“Spark-Cassandra-连接器”%“1.5.0”%“提供”

这是我的申请html" target="_blank">代码:

    val measurements = KafkaUtils.createDirectStream[
  Array[Byte],
  Array[Byte],
  DefaultDecoder,
  DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
  .map {
    case (k, v) => {
      val decoder = new AvroDecoder[WattioMeasure](null,
        WattioMeasure.SCHEMA$)
      decoder.fromBytes(v)
    }
  }

//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
  processWattioRaw(measurements))(
  (rdd: RDD[
    WattioTenantRaw], t: Time) => {
    rdd.cache()
    //get all the different tenants
    val differentTenants = rdd.map(a
    => a.tenant).distinct().collect()
    // for each tenant, create keyspace value and flush to cassandra
    differentTenants.foreach(tenant => {
      val keyspace = tenant + "_readings"
      rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
    })
    rdd.unpersist(true)
  }
)

ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()

共有1个答案

戴凯歌
2023-03-14

你需要确保你的罐子可以提供给工人。一旦作业开始执行,spark master将打开一个文件服务器。

您需要使用sparkcontext.setjars或通过传递给spark-submit--jars标志指定uber jar的路径。

从文档中

 类似资料:
  • 命令用于将数据插入到表的列中。 语法: 示例: 在之前的文章中,我们创建一个名为“”的表,其中包含列(, , ),需要在表中插入一些数据。 我们来看看向“”表中插入数据的代码 - 在执行上面语句插入数据后,可以使用SELECT命令验证是否成功插入了数据。 执行结果如下所示 - 如下图所示 -

  • 我正在尝试使用hector API将数据插入到cassandra数据库中。下面显示了我使用的代码。 但是在给定的keyspace下的/var/lib/cassandra/data文件夹中找不到任何插入的数据。数据插入似乎不能正常工作。代码有什么问题。下面显示了我用来创建'data'列族的命令。

  • 我已经创建了一个密钥空间。 我只有两个节点,DC1和数据中心1节点都已启动。现在,当我试图执行一批insert语句时 我收到一个例外,说 当我移除.ifNotExists()子句时,批处理执行没有任何异常。 使用数据轴驱动程序版本 2.1.7 。 我应该如何解决这个问题? 编辑:节点工具状态

  • 我想向数据库中插入值,但它无法工作,尽管我的代码在用作存储过程时运行良好。我需要使用按钮点击来存储值。请告诉我代码有什么问题。它没有显示任何错误或异常,但表中的数据没有更新

  • 我正在尝试使用Pig将HDFS中的文件中的数据复制到Cassandra中的表中。但在将数据存储在Cassandra中时,作业失败,出现空指针异常。有人能帮我吗? 用户表结构: 创建表用户(user\u id text主键、age int、第一个文本、最后一个文本) 我的猪脚本 > A=加载“/用户/hduser/用户。txt“使用PigStorage(',')作为(id:chararray,age