当前位置: 首页 > 知识库问答 >
问题:

Apache Spark与Cassandra行为

岑元徽
2023-03-14

我正在编写一个从Cassandra获取数据的独立Spark程序。我遵循示例并通过newAPIHadoopRDD()和ClonFamilyInputFormat类创建了RDD。RDD已创建,但当我调用RDD的. groupByKey()方法时,我得到了一个NotSerializableException:

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local").setAppName("Test");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);

    Job job = new Job();
    Configuration jobConf = job.getConfiguration();
    job.setInputFormatClass(ColumnFamilyInputFormat.class);

    ConfigHelper.setInputInitialAddress(jobConf, host);
    ConfigHelper.setInputRpcPort(jobConf, port);
    ConfigHelper.setOutputInitialAddress(jobConf, host);
    ConfigHelper.setOutputRpcPort(jobConf, port);
    ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
    ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
    ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");

    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setFinish(new byte[0]);
    sliceRange.setStart(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(jobConf, predicate);

    JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
    spark.newAPIHadoopRDD(jobConf,
    ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
    ByteBuffer.class, SortedMap.class);

    JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
    System.out.println(groupRdd.count());
}

例外情况:

JAVAio。NotSerializableException:java。nio。java上的HeapByteBuffer。io。ObjectOutputStream。java上的writeObject0(ObjectOutputStream.java:1164)。io。ObjectOutputStream。java上的defaultWriteFields(ObjectOutputStream.java:1518)。io。ObjectOutputStream。在java上编写SerialData(ObjectOutputStream.java:1483)。io。ObjectOutputStream。java上的writeOrdinaryObject(ObjectOutputStream.java:1400)。io。ObjectOutputStream。java上的writeObject0(ObjectOutputStream.java:1158)。io。ObjectOutputStream。组织中的writeObject(ObjectOutputStream.java:330)。阿帕奇。火花序列化程序。JavaSerializationStream。org上的writeObject(JavaSerializer.scala:42)。阿帕奇。火花存储DiskBlockObjectWriter。在org上编写(BlockObjectWriter.scala:179)。阿帕奇。火花调度程序。ShuffleMapTask$$anonfun$runTask$1。在组织中应用(ShuffleMapTask.scala:161)。阿帕奇。火花调度程序。ShuffleMapTask$$anonfun$runTask$1。在scala上应用(ShuffleMapTask.scala:158)。收集迭代器$类。foreach(迭代器.scala:727)位于org。阿帕奇。火花中断迭代器。foreach(interruptableiterator.scala:28)位于org。阿帕奇。火花调度程序。ShuffleMapTask。org上的runTask(ShuffleMapTask.scala:158)。阿帕奇。火花调度程序。ShuffleMapTask。运行任务(ShuffleMapTask.scala:99),位于组织。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:51)。阿帕奇。火花执行人。执行者$TaskRunner。在java上运行(Executor.scala:187)。util。同时发生的ThreadPoolExecutor$工作者。java上的runTask(ThreadPoolExecutor.java:895)。util。同时发生的ThreadPoolExecutor$工作者。在java上运行(ThreadPoolExecutor.java:918)。lang.Thread。运行(Thread.java:662)

我要做的是将所有行键列合并到一个条目中。当我尝试使用reduceByKey()方法时,也会遇到同样的异常,如下所示:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
    new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
        public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
            SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
            SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
            sortedMap.putAll(arg0);
            sortedMap.putAll(arg1);
            return sortedMap;
        }
    }
);

我正在使用:

  • spark-1.0.0-bin-hadoop1

有人知道问题出在哪里吗?序列化失败的原因是什么?

谢谢,
Shai

共有1个答案

羊新翰
2023-03-14

您的问题可能是由于尝试序列化ByteBuffers引起的。它们是不可序列化的,您需要在生成RDD之前将它们转换为字节数组。

您应该尝试官方DataStax Cassandra driver for Spark,此处提供

 类似资料:
  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 问题内容: 我正在学习NoSQL,并正在为客户的需求之一寻找不同的选择。在提出这个问题之前,我已经遍历了各种资源(对NoSQL不太了解的人) 我需要以更快的速度存储数据并读取数据。 完全故障安全且易于扩展。 能够搜索数据以获取Google Analytics(分析)。 最后我列出了以下内容: 我所了解的是,Cassandra对我来说是一个完美的NoSQL存储解决方案,因为我可以使用索引写入数据和读

  • 给定一个包含以下格式数据的大文件(V1,V2,…,VN) 我正在尝试使用Spark获得一个类似于下面的配对列表 我尝试了针对一个较旧的问题所提到的建议,但我遇到了一些问题。例如, 我得到了错误, 有人能告诉我哪些地方我可能做得不对,或者有什么更好的方法可以达到同样的效果?非常感谢。

  • 我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。

  • 主要内容:从二进制tar文件安装Apache Cassandra和Datastax企业级被不同组织用于存储大量数据。在安装Apache Cassandra之前,您必须具备以下事项: 必须拥有datastax社区版本,可以点击这里下载Cassandra3.10。 必须提前安装好JDK8以上版本。 必须提前安装好JDK。 最新版本的Java 8,要验证是否安装了正确版本的Java,请在终端上输入: 对于使用, 需要安装(一定要使用这

  • 您好,可以使用LazyDataModel与Cassandra一起在数据表中延迟加载数据。我以前一直在使用Oracle。使用Cassandra时会出现任何问题,并且在“谈论”这个问题时,最好了解延迟加载是如何工作的?每次从一个页面到另一个页面发生更改时都会加载数据吗?还是数据只是加载一次?任何帮助是值得赞赏的。 谢谢