当前位置: 首页 > 知识库问答 >
问题:

具有repartitionByCassandraReplica函数的cassandra-spark-connector错误

仲和韵
2023-03-14

我试图使用1.2版本中的新联接功能,但在repl中的repartitionByCassandraReplica函数出现了一个错误。

我尝试复制该网站的示例,并创建了一个cassandra表(shopping_history),其中包含几个元素:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.mde

import com.datastax.spark.connector.rdd._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
import com.datastax.driver.core._

case class CustomerID(cust_id: Int)
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_))
val repartitioned =  idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10)
repartitioned.first()

我得到这个错误:

15/04/13 18:35:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, dev2-cim.aid.fr): java.lang.ClassNotFoundException: $line31.$read$$iwC$$iwC$CustomerID
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:344)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

谢谢

共有1个答案

隗高旻
2023-03-14

我将在这里回答你的第二个问题,如果我能根据更多的信息找到一些东西,我将继续第一部分。

我也很好奇:joinWithCassandraTable/cassandraTable与In子句/foreachPartition(withSessionDo)语法之间的差异。

带有in子句的CassandRatable将创建单个火花分区。因此,对于非常小的in子句可能没有问题,但是该子句必须从驱动程序序列化到spark应用程序。对于大in子句来说,这可能非常糟糕,而且一般来说,如果没有必要,我们不希望将数据从spark驱动程序来回发送到执行程序。

 类似资料:
  • 是否可以将Spark cassandra connector Java API与Spark 2.0+一起使用? 我看到spark-cassandra-connector-java2.11的最新版本是1.6.0-M1。 有人知道连接器的Java API的未来吗? 沙伊

  • 如何为版本设置以下属性: 本质上,我想设置它,以便应该有0个连接到我的远程dc的客户端,也应该有0个读/写。一切都应该是我正在磨合的DC本地的。 将设置为本地DC是否会达到相同的效果?

  • 我有一个关于这个连接器的问题。如果我的Spark集群和Cassandra集群不在同一个集群上,读取如何工作?Spark是否将整个Cassandra表带入自己的集群并将其重新排列到Spark分区中?

  • 我得到了一个错误:- 线程“main”java.lang.nosuchmethoderror:com.datastax.driver.core.queryoptions.setrefreshnodeintervalmillis(I)lcom/datastax/driver/core/queryoptions;**在com.datastax.spark.connector.cql.defaultCo

  • 10凯瑟琳 我在本地运行start-all.sh启动了Spark 然后我创建了这个类“SparkCassandraconnector”,它有一个连接spark和Cassandra的命令。

  • 我在让Spark Cassandra连接器在Scala中工作时遇到问题。 我正在使用这些版本: 斯卡拉 2.10.4 火花芯 1.0.2 卡桑德拉-节俭 2.1.0 (我安装的卡桑德拉是 v2.1.0) cassandra-clientutil 2.1.0 卡桑德拉驱动器核心 2.0.4 (推荐用于连接器? 火花-卡桑德拉-连接器 1.0.0 我可以连接并与卡桑德拉(没有火花)交谈,我可以与火花(