线程“streaming-job-executor-53”java.lang.noClassDefounderror:com/datastax/spark/connector/columnselector在com.enerbyte.spark.jobs.wattiopipeline.wattiopipelineStreamingJob$$anonFun$main$2.在com.enerbyte.spark.jobs.wattiopipelineStreamingJob.jobs.wattiopipelineStreamingJob.wattiopipelineStreamingJob.$$anonFun$main$2$anonfun$apply$mcv$sp$1.在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.在org.apache.spark.streaming.dstream.foreachdstream.scala:50(foreachdstream.scala:50)在org.apache.spark.streaming.dstream.foreachdstream.scala:50)在org.apache.spark.streaming.dstream.foreachdstream.scala:50 foreachdstream$$anonfun$1.在org.apache.spark.streaming.dstream.foreachdstream$$anonfu应用$mcv$sp(foreachdstream.scala:49)n$1.在org.apache.spark.streaming.dstream.foreachdstream.scala:49)处应用(foreachdstream.scala:49)在scala.util.try处应用(foreachdstream.scala:49)在org.apache.spark.streaming.schedule.job.run处应用(try.scala:161)在org.apache.spark.streaming.schedule.job.run处应用(job.scala:39)在.在org.apache.spark.streaming.schedule.jobscheduler.scala:224)处应用(jobscheduler.scala:224)在scala.util.dynamicvariable.With Value(dynamicvariable.scala:57)在org.apache.spark.streaming.schedule.jobschedule.scala:223)在java.util.concurrent.threadpoolexecutor.junworker(threadpoolexecutor.worker.run(:617)在java.lang.thread.run(thread.java:745)处由:java.lang.ClassNotFoundException:com.datastax.spark.conne引起ctor.columnSelector在java.net.URLClassLoader.FindClass(URLClassLoader.java:381)在java.lang.ClassLoader.LoadClass(ClassLoader.java:424)在java.lang.ClassLoader.LoadClass(ClassLoader.java:357)
我为连接器添加了依赖关系,如下所示:
“com.datastax.spark”%%“Spark-Cassandra-连接器”%“1.5.0”%“提供”
这是我的申请html" target="_blank">代码:
val measurements = KafkaUtils.createDirectStream[
Array[Byte],
Array[Byte],
DefaultDecoder,
DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
.map {
case (k, v) => {
val decoder = new AvroDecoder[WattioMeasure](null,
WattioMeasure.SCHEMA$)
decoder.fromBytes(v)
}
}
//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
processWattioRaw(measurements))(
(rdd: RDD[
WattioTenantRaw], t: Time) => {
rdd.cache()
//get all the different tenants
val differentTenants = rdd.map(a
=> a.tenant).distinct().collect()
// for each tenant, create keyspace value and flush to cassandra
differentTenants.foreach(tenant => {
val keyspace = tenant + "_readings"
rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
})
rdd.unpersist(true)
}
)
ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()
你需要确保你的罐子可以提供给工人。一旦作业开始执行,spark master将打开一个文件服务器。
您需要使用sparkcontext.setjars
或通过传递给spark-submit
的--jars
标志指定uber jar的路径。
从文档中
命令用于将数据插入到表的列中。 语法: 示例: 在之前的文章中,我们创建一个名为“”的表,其中包含列(, , ),需要在表中插入一些数据。 我们来看看向“”表中插入数据的代码 - 在执行上面语句插入数据后,可以使用SELECT命令验证是否成功插入了数据。 执行结果如下所示 - 如下图所示 -
我正在尝试使用hector API将数据插入到cassandra数据库中。下面显示了我使用的代码。 但是在给定的keyspace下的/var/lib/cassandra/data文件夹中找不到任何插入的数据。数据插入似乎不能正常工作。代码有什么问题。下面显示了我用来创建'data'列族的命令。
我已经创建了一个密钥空间。 我只有两个节点,DC1和数据中心1节点都已启动。现在,当我试图执行一批insert语句时 我收到一个例外,说 当我移除.ifNotExists()子句时,批处理执行没有任何异常。 使用数据轴驱动程序版本 2.1.7 。 我应该如何解决这个问题? 编辑:节点工具状态
Java和Scala解决方案都受到欢迎
我想向数据库中插入值,但它无法工作,尽管我的代码在用作存储过程时运行良好。我需要使用按钮点击来存储值。请告诉我代码有什么问题。它没有显示任何错误或异常,但表中的数据没有更新
我正在尝试使用Pig将HDFS中的文件中的数据复制到Cassandra中的表中。但在将数据存储在Cassandra中时,作业失败,出现空指针异常。有人能帮我吗? 用户表结构: 创建表用户(user\u id text主键、age int、第一个文本、最后一个文本) 我的猪脚本 > A=加载“/用户/hduser/用户。txt“使用PigStorage(',')作为(id:chararray,age