public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf()
.setAppName("Example Spark App")
.setMaster("local[1]")
.set("spark.cassandra.connection.host", "127.0.0.1");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
sparkContext.hadoopConfiguration().set("fs.s3a.access.key", "XXXX");
sparkContext.hadoopConfiguration().set("fs.s3a.secret.key", "YYYY");
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "XXXXX");
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sparkContext.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
JavaPairRDD<String, PortableDataStream> javaPairRDD = sparkContext.binaryFiles("s3a://root/folder/");
File ROOT = createTempFolder().getAbsoluteFile();
JavaRDD<List<SearchEntity>> listJavaRDD = javaPairRDD.map(rdd -> {
System.out.println("Working on TAR: " + rdd._1);
DataInputStream stream = rdd._2.open();
// some preprocess
List<SearchEntity> toCassandraList = new WorkerTest(ROOT, stream).run();
return toCassandraList;
});
// here I want to take List<SearchEntity> toCassandraList and save them
// but I don't see how as it support only single object ..
CassandraJavaUtil.javaFunctions(listJavaRDD)
.writerBuilder("demoV2", "simple_search",
CassandraJavaUtil.mapToRow(List<SearchEntity> list objects ...)) // here is problem
.saveToCassandra();
System.out.println("Finish run s3ToCassandra:");
sparkContext.stop();
}
CREATE TABLE simple_search (
engine text,
term text,
time bigint,
rank bigint,
url text,
domain text,
pagenum bigint,
descr text,
display_url text,
title text,
type text,
PRIMARY KEY ((engine, term), time , url, domain, pagenum)
) WITH CLUSTERING ORDER BY (time DESC, url DESC, domain DESC , pagenum DESC);
Java和Scala解决方案都受到欢迎
要写入数据,您需要处理searchentity
,而不是searchentity
的列表。为此,您需要使用FlatMap
而不是普通的Map
:
JavaRDD<SearchEntity> entriesRDD = javaPairRDD.flatMap(rdd -> {
System.out.println("Working on TAR: " + rdd._1);
DataInputStream stream = rdd._2.open();
// some preprocess
List<SearchEntity> toCassandraList = new WorkerTest(ROOT, stream).run();
return toCassandraList;
});
然后您只需编写as per文档:
javaFunctions(rdd).writerBuilder("demoV2", "simple_search",
mapToRow(SearchEntity.class)).saveToCassandra();
附言。但是要小心,如果tar太大,在创建列表
时,可能会在workers上导致内存错误。根据tar文件中的文件格式,最好先解压缩数据,然后使用Spark读取数据。
我有一些Spark经验,但刚开始使用Cassandra。我正在尝试进行非常简单的阅读,但性能非常差——不知道为什么。这是我正在使用的代码: 所有3个参数都是表上键的一部分: 主键(group\u id,epoch,group\u name,auto\u generated\u uuid\u field),聚类顺序为(epoch ASC,group\u name ASC,auto\u generat
谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?
编辑1 当选择正确的scala版本时,它似乎会更进一步,但我不确定下面的输出是否仍然有需要解决的错误:
我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这
Java 1.8.0_151 Spark 2.2.1 Scala 2.11 卡桑德拉3.11.1
我正在使用Spark-Cassandra连接器1.1.0和Cassandra 2.0.12。 谢谢, 沙伊