当前位置: 首页 > 知识库问答 >
问题:

带火花连接器的Cassandra-如何向Cassandra插入项目列表

袁志专
2023-03-14
public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf()
        .setAppName("Example Spark App")
        .setMaster("local[1]")
        .set("spark.cassandra.connection.host", "127.0.0.1");
        
    JavaSparkContext sparkContext = new JavaSparkContext(conf);
    sparkContext.hadoopConfiguration().set("fs.s3a.access.key", "XXXX");
    sparkContext.hadoopConfiguration().set("fs.s3a.secret.key", "YYYY");
    sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "XXXXX");
    sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
    sparkContext.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
                
    JavaPairRDD<String, PortableDataStream> javaPairRDD = sparkContext.binaryFiles("s3a://root/folder/");
        
    File ROOT = createTempFolder().getAbsoluteFile();
        
    JavaRDD<List<SearchEntity>> listJavaRDD = javaPairRDD.map(rdd -> {
            System.out.println("Working on TAR: " + rdd._1);
        
            DataInputStream stream = rdd._2.open();
        
            // some preprocess
            List<SearchEntity> toCassandraList = new WorkerTest(ROOT, stream).run();
        
            return toCassandraList;
        });
        
    // here I want to take List<SearchEntity> toCassandraList and save them
    // but I don't see how as it support only single object ..
    CassandraJavaUtil.javaFunctions(listJavaRDD)
        .writerBuilder("demoV2", "simple_search", 
                       CassandraJavaUtil.mapToRow(List<SearchEntity> list objects ...)) // here is problem
        .saveToCassandra();
        
    System.out.println("Finish run s3ToCassandra:");
    sparkContext.stop();
}
CREATE TABLE simple_search (
    engine text,
    term text,
    time bigint,
    rank bigint,
    url text,
    domain text,
    pagenum bigint,
    descr text,
    display_url text,
    title text,
    type text,
    PRIMARY KEY ((engine, term), time , url, domain, pagenum)
) WITH CLUSTERING ORDER BY (time DESC, url DESC,  domain DESC , pagenum DESC);

Java和Scala解决方案都受到欢迎

共有1个答案

燕嘉颖
2023-03-14

要写入数据,您需要处理searchentity,而不是searchentity的列表。为此,您需要使用FlatMap而不是普通的Map:

JavaRDD<SearchEntity> entriesRDD = javaPairRDD.flatMap(rdd -> {
        System.out.println("Working on TAR: " + rdd._1);
        DataInputStream stream = rdd._2.open();
        // some preprocess
        List<SearchEntity> toCassandraList = new WorkerTest(ROOT, stream).run();
        return toCassandraList;
    });

然后您只需编写as per文档:

javaFunctions(rdd).writerBuilder("demoV2", "simple_search",
   mapToRow(SearchEntity.class)).saveToCassandra();

附言。但是要小心,如果tar太大,在创建列表 时,可能会在workers上导致内存错误。根据tar文件中的文件格式,最好先解压缩数据,然后使用Spark读取数据。

 类似资料:
  • 我有一些Spark经验,但刚开始使用Cassandra。我正在尝试进行非常简单的阅读,但性能非常差——不知道为什么。这是我正在使用的代码: 所有3个参数都是表上键的一部分: 主键(group\u id,epoch,group\u name,auto\u generated\u uuid\u field),聚类顺序为(epoch ASC,group\u name ASC,auto\u generat

  • 谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?

  • 编辑1 当选择正确的scala版本时,它似乎会更进一步,但我不确定下面的输出是否仍然有需要解决的错误:

  • 我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这

  • Java 1.8.0_151 Spark 2.2.1 Scala 2.11 卡桑德拉3.11.1

  • 我正在使用Spark-Cassandra连接器1.1.0和Cassandra 2.0.12。 谢谢, 沙伊