当前位置: 首页 > 知识库问答 >
问题:

Apache Sparkjava.lang.ClassCastException在远程主节点中运行forEach分区时[重复]

夔桐
2023-03-14

我有一个Java的微服务,它连接到Apache Spark集群,并使用Datastax Spark-Cassandra连接器将数据持久化到apache Cassandra DB集群。

我编写了以下方法来从Cassandra表中删除特定日期范围的数据。

具体代码如下所示:

public void deleteData(String fromDate, String toDate) {


    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
            .filter(col("timestamp")
                    .substr(1, 10)
                    .between(fromDate, toDate))
            .select("nodeid");


    df.foreachPartition(partition -> {
        Session session = connector.openSession();
        while (partition.hasNext()) {
            Row row = partition.next();
            session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
        }
        session.close();
    });
}

}

    @Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .appName("SparkCassandraApp")
            .config("spark.cassandra.connection.host", host)
            .config("spark.cassandra.connection.port", port)
            .config("spark.sql.caseSensitive", false)
            .master(master)
            .getOrCreate();

使用本地spark master节点(.master(“local[*]”)选项)运行时,代码执行良好。

但是,当我在连接到远程火花主节点时尝试执行相同的代码时,会发生以下错误:

驱动程序堆栈跟踪:]与根本原因tream.java:2069ClassCastException:无法将java.io.SerializedLambda的实例分配到字段tream.readDataset$$anonfuns$foreach分区$tream.java:1573类型的4美元java.io.ForeachPartionFunction在实例tream.defaultDataset$$anonfuns$foreachPartion2美元在tream.java:2287ObjectStreamClass$FieldRjava.io.ObjFieldValue(ObjectStreamCtream.read)在java.io.ObjectStreamClass.setObjFieldValue(ObjectStreamClass.java:1417)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.read普通对象(ObjectInputSjava.lang.)在java.lang.invoke.ObjectInputSorg.apache.spark.sql.Object0(ObjectInputS2.func)在org.apache.spark.api.java.function.ObjectInputSorg.apache.spark.sql.ReadFields(ObjectInputSjava.io.)在eflector.setObjectInputSlass.java:2287Serjava. io处的ObjectInputStream. read普通对象(ObjectInputStream. java: 2069)。java. io处的ObjectInputStream. readObject0(ObjectInputStream. java: 1573)。org. apache. spak.序列化器处的ObjectInputStream. readObject(ObjectInputStream. java: 431)。org. apache. spak.序列化器处的JavaDeserializationStream. readObject(JavaSerializer. scala: 75)。org. apache. spak.调度器处的JavaSerializerInstance.反序列化(JavaSerializer. scala: 114)。org. apache. spak.调度器处的结果任务.运行任务(任务. scala: 83)。org. apache. spak.执行器处的任务.运行(任务. scala: 123)。执行器$任务

更新1

似乎对我有用的技巧是在spik会话配置中添加以下行:

.config("spark.jars", "meter-service-1.0.jar")

这似乎提供了缺失的依赖项,这些依赖项阻止Spark在远程节点上正确反序列化lamda表达式。

这在这里解释得更好


共有1个答案

祁宾白
2023-03-14

我的JAVA很脆弱,但您能否尝试将lambda提取到方法中?

public void deleteData(String fromDate, String toDate) {
    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
        .filter(col("timestamp")
                .substr(1, 10)
                .between(fromDate, toDate))
        .select("nodeid");


    df.foreachPartition(new ForeachPartitionFunction<Row>() {
        public void call(Iterator<Row> partition) {
            Session session = connector.openSession();
            while (partition.hasNext()) {
                Row row = partition.next();
                session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
            }
            session.close();
        }
    });
}
 类似资料:
  • 我的一台远程机器上有硒集线器,其他机器也有硒节点。 是有集线器的机器,我有一个本地节点和一个远程节点。 两者都连接到远程集线器,但是chrome不在远程节点上运行,而是在本地运行。 当我运行上面的代码(在远程节点上)时,我得到以下异常: 硒。常见的例外。WebDriverException:Message:u“未知错误:Chrome无法启动:崩溃\n(驱动程序信息:chromedriver=2.1

  • 机器A和B之间的通信工作良好。我可以运行像或这样的命令,它会给出预期的结果: 我听说过,但还没有尝试过,但据我所知,这并不能解决我的问题。 有什么方法可以直接使用来实现这一点。解决办法可能是使用连接到远程主机,并直接从远程主机使用客户机,但我希望尽可能避免这种解决方案。 在上运行,而不是在本地计算机上运行。

  • 问题内容: 是否可以(直接使用命令或API)从远程主机启动容器? 假设我有两台具有不同体系结构的机器:-A是一台机器-B是一台机器 我想使用我的A机器在B机器上运行一个容器。起初,我认为可以使用以下命令: 但这个命令实际上拉图像,并试图在其中一些最终的机器运行它的原因很明显,你不能运行的图像所特有的一种机器。 机器A和B之间的通信正常。我可以运行诸如或的命令,它可以给我预期的结果: 我听说过并且还

  • 我已经定义了partitioner类,它返回与网格大小相同的executionContext。执行上下文={part3=start=0,part1=start=0,part2=start=0} 日志:-

  • 问题内容: 我安装了WebMatrix,并按照以下说明在Windows 7计算机上安装IIS 7。 当我单击“运行”以运行我的快速节点应用程序时,浏览器弹出并告诉我 iisnode模块无法启动node.exe进程。确保node.exe可执行文件在web.config 的system.webServer/iisnode/@nodeProcessCommandLine元素中指定的位置可用。默认情况下,