当前位置: 首页 > 知识库问答 >
问题:

后端 - scala栈溢出的问题?

龚鸿雪
2025-03-16

代码:

package DAO

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}

object resultSaver extends java.io.Serializable {

  def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {

    // 提前将推荐结果转换为字符串
    val preparedData = input
      .select(
        col("userId"),
        expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
      )
      .withColumn("recommendations", concat_ws(", ", col("recommendations")))
      .rdd
      .map(row => (row.getInt(0), row.getString(1)))

    preparedData.foreach { case (userId, recStr) =>
      // 保存推荐结果
      try {
        RedisDAO.hset(s"$topicName:$batchId", userId.toString, recStr)
      } catch {
        case e: Exception =>
          println(s"Failed to write Redis for user $userId", e.getMessage)
      }
      // 保存用户批次信息
      val userIdStr = userId.toString
      val oldBatchIds = RedisDAO.hget(userIdStr, topicName).getOrElse("")
      val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
      try {
        RedisDAO.hset(userIdStr, topicName, newBatchIds)
      } catch {
        case e: Exception =>
          println(s"Failed to write user batch for $userIdStr", e.getMessage)
      }
    }
  }
}

错误:

25/03/14 19:45:23 ERROR Executor: Exception in task 0.0 in stage 182.0 (TID 582)
java.lang.StackOverflowError
    at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)

一直会报栈溢出
我尝试增大栈的大小:

sparkConfig.set("spark.executor.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存
    sparkConfig.set("spark.driver.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存

但是时好时坏
好像也不是代码的问题
我想根治这个问题
谢谢大神!!!

共有1个答案

吕飞翼
2025-03-16
package DAO

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}

object resultSaver extends java.io.Serializable {

  def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {

    // 提前将推荐结果转换为字符串
    val preparedData = input
      .select(
        col("userId").cast("string"),
        expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
      )
      .withColumn("recommendations", concat_ws(", ", col("recommendations")))
    
    // 使用foreachPartition而不是foreach和rdd.map,减少序列化压力
    preparedData.foreachPartition { partition =>
      partition.foreach { row =>
        val userId = row.getString(0)
        val recStr = row.getString(1)
        
        // 保存推荐结果
        try {
          RedisDAO.hset(s"$topicName:$batchId", userId, recStr)
        } catch {
          case e: Exception =>
            println(s"Failed to write Redis for user $userId", e.getMessage)
        }
        
        // 保存用户批次信息
        val oldBatchIds = RedisDAO.hget(userId, topicName).getOrElse("")
        val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
        try {
          RedisDAO.hset(userId, topicName, newBatchIds)
        } catch {
          case e: Exception =>
            println(s"Failed to write user batch for $userId", e.getMessage)
        }
      }
    }
  }
}

试一下代码呢,优化了一下

 类似资料:
  • Summary Stack overflows occur when variable size data is copied into fixed length buffers located on the program stack without any bounds checking. Vulnerabilities of this class are generally consider

  • 我有一个执行快速排序的应用程序。在我开始给它一些更大的数字(我第一次得到它是10000000)之前,它工作得很好。我知道是由递归引起的,但我不明白为什么我的应用程序会因此而崩溃。如有任何建议,将不胜感激。这是我的密码:

  • 遇到的问题:项目第一次运行时,没问题不报错,然后保存项目,热更新重新加载时,导致项目断开,需要重新运行, 已经设置了set NODE_OPTIONS=--max_old_space_size但是不起作用,请问是什么原因呢? 报错信息如下:

  • 问题内容: 下面给出的代码显示了运行时的Stackoverflow错误。但是,如果我使另一个类CarChange创建Car的对象,它将成功运行。我是一个初学者,请执行以下代码以了解在Java中进行向上转换的重要性。 问题答案: 一个stackoverflow通常意味着您有一个无限循环。 收到此消息的原因是因为您从testdrive方法调用驱动器,并且在该方法中再次调用drive。

  • 每个任务都有一个自己的堆栈,如果任务使用xTaskCreate()创建,则任务堆栈会自动从堆内存上创建。如果使用xTaskCreateStatic()创建,则堆栈由开发者自己确定并提供。堆栈溢出是影响系统稳定性的一个常见因素,freeRTOS提供两种可选的用于检测和纠正堆栈溢出的机制。使用配置选项configCHECK_FOR_STACK_OVERFLOW设置。 注意,这些选择只在那些内存映射不是

  • 对于下面的输入,我得到一个StackOverflow错误。你们能帮我解释一下吗,以及如何在我的代码中解决这个问题。

  • events.js:377 RangeError [Error]: Maximum call stack size exceeded Emitted 'error' event on process instance at: npm ERR! code ELIFECYCLE npm ERR! errno 1 npm ERR! sea-wind-power@1.0.0 dev: vite --hos

  • 问题内容: 这有效:http : //play.golang.org/p/-Kv3xAguDR。 这导致堆栈溢出:http : //play.golang.org/p/1-AsHFj51O。 我不明白为什么。在这种情况下,使用接口的正确方法是什么? 问题答案: 这个 将呼叫您的,依次呼叫,等等。如果您需要解组JSON然后对其进行处理,那么一种巧妙的技术是声明一个本地类型,将数据解组到其中,然后转换