当前位置: 首页 > 知识库问答 >
问题:

Spark和Cassandra并行处理

龙星渊
2023-03-14

假设数组如下所示:

值ips=数组(1,2,3,4,5)

数组中最多可以有100.000个值。

ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})

另一方面,如果我这样做:

val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})

我得到serialization异常,因为spark正在尝试序列化spark上下文,而spark上下文是不可序列化的。

如何使这个工作,但仍然利用并行性。

这是我得到的咒语:

共有1个答案

邬宏扬
2023-03-14

最简单的方法是使用Spark Cassandra连接器,它可以处理连接池和序列化。

有了它,你就可以做一些事情,比如

sc.parallelize(inputData, numTasks)
  .mapPartitions {  it =>
    val con = CassandraConnection(yourConf)
    con.withSessionDo{ session =>
      //Use the session
    }
    //Do any other processing
  }.saveToCassandra("ks","table"

这将是完全手动操作的卡桑德拉连接。所有会话都将被自动合并和缓存,如果您准备了一条语句,这些会话也将被缓存在执行器上。

sc.parallelize(inputData, numTasks)
  .joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key
  .map( //manipulate returned results if needed )
  .saveToCassandra("ks","table")
 类似资料:
  • 我正在编写一个从Cassandra获取数据的独立Spark程序。我遵循示例并通过newAPIHadoopRDD()和ClonFamilyInputFormat类创建了RDD。RDD已创建,但当我调用RDD的. groupByKey()方法时,我得到了一个NotSerializableException: 例外情况: JAVAio。NotSerializableException:java。nio。

  • null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB

  • 我试图使用Apache Spark来处理我的大型(230K条目)cassandra数据集,但我经常遇到不同类型的错误。然而,我可以成功地运行应用程序时,运行在一个数据集约200个条目。我有一个由3个节点和1个主节点和2个工作节点组成的spark设置,这两个工作节点还安装了一个cassandra集群,该集群的数据索引复制系数为2。我的两个spark workers在web界面上显示2.4和2.8GB

  • 如何在spark scala代码中为版本-DataStax spark Cassandra Connector 1.6.3设置以下Cassandra写参数。 Spark版本-1.6.2 spark.cassandra.output.batch.size.rows spark.cassandra.output.concurrent.writes spark.cassandra.output.batc

  • 我得到了一个错误:- 线程“main”java.lang.nosuchmethoderror:com.datastax.driver.core.queryoptions.setrefreshnodeintervalmillis(I)lcom/datastax/driver/core/queryoptions;**在com.datastax.spark.connector.cql.defaultCo

  • 是否可以将Spark cassandra connector Java API与Spark 2.0+一起使用? 我看到spark-cassandra-connector-java2.11的最新版本是1.6.0-M1。 有人知道连接器的Java API的未来吗? 沙伊