我刚接触Cassandra Spark,并尝试使用Spark主集群将数据从文件加载到Cassandra表。我遵循以下链接中给出的步骤
http://docs.datastax.com/en/datastax_enterprise/4.7/datastax_enterprise/spark/sparkImportTxtCQL.html
在第8步,数据显示为整数数组,但当我使用相同的命令时,结果显示为字符串Array[Array[String]]=Array(Array(6,7,8))
应用显式转换方法后例如
scala> val arr = Array("1", "12", "123")
arr: Array[String] = Array(1, 12, 123)
scala> val intArr = arr.map(_.toInt)
intArr: Array[Int] = Array(1, 12, 123)
结果显示为以下格式< code > res24:org . Apache . spark . rdd . rdd[Int]= MapPartitionsRDD[7]at map at
现在,在使用take函数或对其应用任何函数从中检索数据后,会出现以下错误
15/09/10 17:21:23 INFO SparkContext:开始作业:获取时间:36 15/09/10 16:21:23 INFODAGScheduler:获取作业23(获取时间::36),具有1个输出分区(allowLocal=true)15/09/10 15:21:23 INFO-DAGScheduler:最终阶段:ResultStage 23(获取位置:36)15/09/1017:21:23信息DAGScheddler:最终阶段的父级:ListDAGScheduler:正在提交ResultStage 23(MapPartitionsRDD[7],位于:33),其中没有缺失的父项15/09/10 17:21:23 INFO MemoryStore:ensureFreeSpace(3448),调用curMem=411425,maxMem=257918238 15/09/10 17:21:23 INFO MemoryStore:块broadcast_25作为值存储在内存中(估计大小为3.4 KB,可用245.6 MB)15/09/10 15:21:23 INFOMemoryStore:ensureFreeSpace(2023)调用curMem=414873,maxMem=257918238 15/09/10 17:21:23 INFO MemoryStore:块broadcast_25_piece0以字节形式存储在内存中(估计大小为2023.0 B,可用245.6 MB)15/09/10 15:21:23 INFOBlockManagerInfo:在192.168.1.137:57524(大小:2023.0 B、可用245.9 MB)上的内存中添加了broadcast_2 5_piece 0 15/09/1017:21:23信息SparkContext:从DAGScheduler广播创建广播25。scala:874 15/09/10 17:21:23 INFO DAGScheduler:从ResultStage 23提交1个缺少的任务(MapPartitionsRDD[7],位于:33)15/09/10 15:21:23 INFOTaskSchedulerImpl:添加任务集23.0和1个任务15/09/10 16:21:23 INFOtaskSetManager:在阶段23.0中启动任务0.0(TID 117,192.168.1.138,PROCESS_LOCAL,1512字节)15/09/1017:21:23信息块管理器信息:添加broadcast_25_piece0192.168.1.138:34977上的内存(大小:2023.0 B,可用空间:265.4 MB)15/09/10 17:21:23 WARN TaskSetManager:在阶段23.0中丢失任务0.0(TID 117,192.168.1.138):java.lang.ClassNotFoundException:$line67.$read$$iwC$$iwC$iwC$$iwC$$iwC$$iwC$iwC$$iwC$iwC$$URLClassLoader$1.run(URLClassLoader.java:355)位于java.security.AccessController。位于java.lang.ClassLoader.loadClass(ClassLoader.java:425)的java.net.URLClassLoader.findClass(URLClassLoader.java.354)处的doPrivileged(本机方法)位于java.lang.ClassLoader.loadClass(ClassLoader.java:358)处。位于org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)的java.lang.Class.forName(Class.java:274)的forName0(本机方法)位于java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:11612)的java.io.OObjectInputStream.read ClassDesc(ObjectInputStream.java:1517)的java.io ObjectInputStream/readOrdinaryObject(ObjectInputStream.java:1771)的java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)位于java.io.OObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)位于java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)位于java位于java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)的java.io.OObjectInputStream.defaultReadFields位于org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)位于org.apache.sspark.scheduleer.Task.run(Task.scala:70)位于org.apache.spark.executor.executor$TaskRunner.run(executor.scala:213)位于java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)位于java.lang.Thread.run(Thread.java:745)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) atscala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
预先感谢你的帮助
Classpath中似乎没有连接驱动程序。
看这一点:
java.lang.ClassNotFoundException:
at java.lang.Class.forName(Class.java:274)
请检查您的项目并检查您的依赖项中是否有 Cassandra 连接器。
我希望我有所帮助。
我试图使用R将多个parquet文件加载到一个Spark表中。附加的代码显示了我是如何做到这一点的。
我想使用Spark Session2.2从HDFS中的Excel文件加载数据。下面是我的Java代码和我得到的异常。 我有个例外: java.lang.nosuchmethoderror:org.apache.poi.ss.usermodel.workbook.close()V at com.crealytics.spark.excel.excelrelation.com$crealytics$s
我正在通过SSIS将数据从csv文件加载到我的sql表中。是否对从csv文件读取的记录数指定了默认限制? 在加载csv文件时,我的数据流组件只处理5000条记录,尽管它包含5341条记录,如下面的图像所示。我如何修复这个问题?
问题内容: 我需要从多个JSON文件中加载数据,每个文件中都有多个记录到Postgres表中。我正在使用以下代码,但无法正常工作(在Windows上使用pgAdmin III) SAMPLE.JSON文件的内容是这样的(从许多这样的记录中得到两个记录): 问题答案: 试试这个:
尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中