当前位置: 首页 > 知识库问答 >
问题:

即使使用POJO,任务也不能在Flink中序列化

袁华清
2023-03-14

我有一个从CSV文件读取的数据集

val dataSet = env.readCsvFile[ElecNormNew](
      getClass.getResource("/elecNormNew.arff").getPath,
      pojoFields = Array("date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer", "label")

据我所知,ElecNormNew是一个POJO:

// elecNormNew POJO
class ElecNormNew(
  var date: Double,
  var day: Int,
  var period: Double,
  var nswprice: Double,
  var nswdemand: Double,
  var vicprice: Double,
  var vicdemand: Double,
  var transfer: Double,
  var label: String) extends Serializable {

  def this() = {
    this(0, 0, 0, 0, 0, 0, 0, 0, "")
  }
}

我还有一个简单的课程:

case class Discretizer[T](
  data: DataSet[T],
  nBins: Int = 5,
  s: Int = 1000) {

  private[this] val log = LoggerFactory.getLogger("Discretizer")
  private[this] val V = Vector.tabulate(10)(_ => IntervalHeap(nBins, 1, 1, s))

  private[this] def updateSamples(x: T): Vector[IntervalHeap] = {
    log.warn(s"$x")
    V
  }

  def discretize() = {
    data map (x => updateSamples(x))
  }
}

但当我尝试使用它时,例如从测试中:

val a = new Discretizer[ElecNormNew](dataSet)
a.discretize

我得到以下错误:

org.apache.flink.api.common.InvalidProgramException: Task not serializable
// ...
[info]     at com.elbauldelprogramador.discretizers.IDADiscretizer.discretize(IDADiscretizer.scala:69)
// ...
[info]     Cause: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
// ...

我读过这些问题和答案,但运气不好:

  • 任务不可序列化Flink
  • 无法在scala中序列化任务
  • 任务不可序列化:java。伊奥。仅在类而非对象上调用闭包外函数时NotSerializableException

共有1个答案

仲孙文乐
2023-03-14

我想说你提到的第一个链接提供了答案:

问题在于,您从MapFunction中引用了数据集页面。这是不可能的,因为数据集只是数据流的逻辑表示,不能在运行时访问。

离散化使用map,因此这里也适用。

 类似资料:
  • 问题内容: 我需要通过scp复制战争文件。 我将添加到,但是仍然出现此错误: 原因:找不到org.apache.tools.ant.taskdefs.optional.ssh.Scp类。这看起来像是Ant的可选组件之一。 这是为了以防万一: http://gist.github.com/320859 问题答案: 从诊断顶部看,这看起来像您的问题: 可选任务:不可用 您的$ ANT_HOME / l

  • 我正在做一个Flink项目,想将源JSON字符串数据解析为Json Object。我正在使用jackson-module-scala进行JSON解析。但是,在Flink API中使用JSON解析器时遇到了一些问题(例如)。 这里有一些代码示例,我无法理解为什么它会这样。 在本例中,我正在执行jackson module scala的官方exmaple代码告诉我的操作: > 创建一个新的 注册 <块

  • 在我的Flink应用程序中,我使用java.time.instant表示UTC时间戳。应用程序运行良好,但我最近在Flink日志中注意到这条消息: “类class java.time.instant不能用作POJO类型,因为并非所有字段都是有效的POJO字段,必须作为GenericType处理。有关对性能影响的详细信息,请阅读Flink文档中的”数据类型&序列化“。”

  • 在我的程序中,我有一个返回一些RDD的方法,我们称它为,它接受一个不可序列化的参数,并让RDD的类型为(我真正的RDD是元组类型,但只包含基元类型)。 当我尝试这样的事情时: 我得到的。 当我用替换(即某个常数)时,它会运行。 从序列化跟踪中,它试图序列化,并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在RDD中。 当我试图直接收集的输出时,即 我也没有问题。 该方法使用获取(本地)值序列

  • 我一直在使用Phonegap开发一个Android应用程序,现在我想让它在应用程序关闭时仍然可以在应用程序中执行java/js代码。所以我知道我需要创建一个服务。如果我在phonegap上创建服务插件,我还能执行javascript代码还是只执行java? 有人做过这样的事情吗?我发现了这个讨论,但似乎没有用:http://groups.google.com/group/phonegap/brow