当前位置: 首页 > 知识库问答 >
问题:

Flink无法序列化scala类/任务不可序列化

钦景胜
2023-03-14

我有一个scala类,它有两个字段是val,但flink说它没有setter。因此,任务不可序列化。

我尝试使用 var 设置器,但随后它说重复设置器。Vals是公开的,那么为什么它要求二传手。Flink 版本 1.1.0

class Impression(val map: Map[String, String],val keySet:Set[String])

我的代码:

  val preAggregate = stream
.filter(impression => {
    true
})
 .map(impression => {
  val xmap = impression.map
  val values = valFunction(xmap)
  new ImpressionRecord(impression, values._1, values._2, values._3)
})

例外情况:

**class Impression does not contain a setter for field map**
19:54:49.995 [main] INFO  o.a.f.a.java.typeutils.TypeExtractor - class Impression is not a valid POJO type
19:54:49.997 [main] DEBUG o.a.flink.api.scala.ClosureCleaner$ - accessedFields: Map(class  -> Set())
Exception in thread "main" **org.apache.flink.api.common.InvalidProgramException: Task not serializable
at** org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at )
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 18 more

共有1个答案

漆雕令秋
2023-03-14

尽量不要直接使用类的字段,而是将其包装在方法中,这样您就可以开始了。

 类似资料:
  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 这给出的错误如下,任何帮助将是感激的:

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 我正在做一个Flink项目,想将源JSON字符串数据解析为Json Object。我正在使用jackson-module-scala进行JSON解析。但是,在Flink API中使用JSON解析器时遇到了一些问题(例如)。 这里有一些代码示例,我无法理解为什么它会这样。 在本例中,我正在执行jackson module scala的官方exmaple代码告诉我的操作: > 创建一个新的 注册 <块

  • 问题内容: 我在android / java中对Location的子类进行序列化遇到了麻烦 位置不可序列化。我有一个名为FALocation的第一个子类,它没有任何实例变量。我已经宣布它可序列化。 然后,我有一个名为Waypoint的第二个类,看起来像这样: 序列化工作正常。 反序列化会产生跟随翼异常(腿对象包含一个航路点): 问题答案: 序列化位置绝对必要吗?也许您可以将其标记为瞬态,并在反序列