当前位置: 首页 > 知识库问答 >
问题:

Spark Shell:任务不可序列化

卜瀚漠
2023-03-14

我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。

import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println) 

我可以看到在控制台打印的ID。

object HelloWorld { 
       def sum(id : String): Unit = {
        val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
        println(each_spark_rdd)
      }
  }
 val uplink_rdd = sc.cassandraTable("keyspace", "table")
scala> HelloWorld.sum("5") 
50

当我试图在每个提取id上运行相同的函数时

myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or 
for (id <- myRDD) HelloWorld.sum(id)

它给出与例外相同的例外

在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....

共有1个答案

郏瀚
2023-03-14

您的代码试图在myrdd的转换中使用uplink_rdd。应用于RDD的闭包不能包含另一个RDD。

您应该按照joinwithcassandratable的方式来做一些事情,它将并行和分布式(ly?)使用myrdd中的信息从Cassandra中提取数据。如果从Cassandra中提取单个分区键,则此操作有效

查看文档

val cc = CassandraConnector(sc.getConf)
myRDD.mapPartitions { it =>
  cc.withSessionDo { session =>
    session.execute("whatever query you want")
  }
}

类似于

myRDD.collect.foreach(HelloWorld.sum)
 类似资料:
  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是

  • 这给出的错误如下,任何帮助将是感激的:

  • 我已经上了三节课 任务未序列化

  • 我想将转换流写入Elasticsearch索引,如下所示: 行抛出错误(见下文)。我尝试了不同的方法来解决这个问题(例如,在旁边添加),但似乎没有任何效果。 它是否与Hadoop的配置有关?(我参考了以下消息:) 更新: