假设数组如下所示:
值ips=数组(1,2,3,4,5)
数组中最多可以有100.000个值。
ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})
另一方面,如果我这样做:
val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})
我得到serialization异常,因为spark正在尝试序列化spark上下文,而spark上下文是不可序列化的。
如何使这个工作,但仍然利用并行性。
这是我得到的咒语:
最简单的方法是使用Spark Cassandra连接器,它可以处理连接池和序列化。
有了它,你就可以做一些事情,比如
sc.parallelize(inputData, numTasks)
.mapPartitions { it =>
val con = CassandraConnection(yourConf)
con.withSessionDo{ session =>
//Use the session
}
//Do any other processing
}.saveToCassandra("ks","table"
这将是完全手动操作的卡桑德拉连接。所有会话都将被自动合并和缓存,如果您准备了一条语句,这些会话也将被缓存在执行器上。
sc.parallelize(inputData, numTasks)
.joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key
.map( //manipulate returned results if needed )
.saveToCassandra("ks","table")
我正在编写一个从Cassandra获取数据的独立Spark程序。我遵循示例并通过newAPIHadoopRDD()和ClonFamilyInputFormat类创建了RDD。RDD已创建,但当我调用RDD的. groupByKey()方法时,我得到了一个NotSerializableException: 例外情况: JAVAio。NotSerializableException:java。nio。
null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB
我试图使用Apache Spark来处理我的大型(230K条目)cassandra数据集,但我经常遇到不同类型的错误。然而,我可以成功地运行应用程序时,运行在一个数据集约200个条目。我有一个由3个节点和1个主节点和2个工作节点组成的spark设置,这两个工作节点还安装了一个cassandra集群,该集群的数据索引复制系数为2。我的两个spark workers在web界面上显示2.4和2.8GB
如何在spark scala代码中为版本-DataStax spark Cassandra Connector 1.6.3设置以下Cassandra写参数。 Spark版本-1.6.2 spark.cassandra.output.batch.size.rows spark.cassandra.output.concurrent.writes spark.cassandra.output.batc
我得到了一个错误:- 线程“main”java.lang.nosuchmethoderror:com.datastax.driver.core.queryoptions.setrefreshnodeintervalmillis(I)lcom/datastax/driver/core/queryoptions;**在com.datastax.spark.connector.cql.defaultCo
是否可以将Spark cassandra connector Java API与Spark 2.0+一起使用? 我看到spark-cassandra-connector-java2.11的最新版本是1.6.0-M1。 有人知道连接器的Java API的未来吗? 沙伊