当前位置: 首页 > 知识库问答 >
问题:

Flink在map中解析JSON:InvalidProgram Exception:任务不可序列化

公孙成仁
2023-03-14

我正在做一个Flink项目,想将源JSON字符串数据解析为Json Object。我正在使用jackson-module-scala进行JSON解析。但是,在Flink API中使用JSON解析器时遇到了一些问题(例如map)。

这里有一些代码示例,我无法理解为什么它会这样。

在本例中,我正在执行jackson module scala的官方exmaple代码告诉我的操作:

>

  • 创建一个新的ObjectMapper
  • 注册DefaultScalaModule <块引用>

    DefaultScalaModule是一个Scala对象,包括对所有当前支持的Scala数据类型的支持。

    我得到的错误是:org.apache.flink.api.common.InvalidProgram Exception:任务不可序列化

    object JsonProcessing {
      def main(args: Array[String]) {
    
        // set up the execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // get input data
        val text = env.readTextFile("xxx")
    
        val mapper = new ObjectMapper
        mapper.registerModule(DefaultScalaModule)
        val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
    
        // execute and print result
        counts.print()
    
        env.execute("JsonProcessing")
      }
    
    }
    

    然后我在Google上搜索了一下,并提出了以下解决方案,其中将registerModule移动到map函数中。

    val mapper = new ObjectMapper
    val counts = text.map(l => {
      mapper.registerModule(DefaultScalaModule)
      mapper.readValue(l, classOf[Map[String, String]])
    })
    

    但是,我无法理解的是:为什么这将与外部定义的对象mapper的调用方法一起工作?是因为ObjectMapper本身是可序列化的,如这里所述ObjectMapper.java#L114?

    现在,JSON解析工作正常,但是每次,我都必须调用mapper.register模块(DefaultScalaModule),我认为这可能会导致一些性能问题(是吗?)。我还尝试了另一种解决方案,如下所示。

    我创建了一个新的case类Jsen,并将其用作相应的解析类,注册Scala模块。而且它也工作正常。

    然而,如果您的输入JSON经常变化,那么这就没有那么灵活了。管理类Jsen是不可维护的。

    case class Jsen(
      @JsonProperty("a") a: String,
      @JsonProperty("c") c: String,
      @JsonProperty("e") e: String
    )
    
    object JsonProcessing {
      def main(args: Array[String]) {
        ...
        val mapper = new ObjectMapper
        val counts = text.map(mapper.readValue(_, classOf[Jsen]))
        ...
    
    }
    

    此外,我还尝试使用JsonNode,而不调用registerModule,如下所示:

        ...
        val mapper = new ObjectMapper
        val counts = text.map(mapper.readValue(_, classOf[JsonNode]))
        ...
    

    它也工作得很好。

    我的主要问题是:实际上是什么导致了在寄存器模块(DefaultScalaModule)的引擎盖下任务不可序列化的问题?

    如何确定您的代码在编码过程中是否可能导致这种不可分解的问题?

  • 共有1个答案

    乐华晖
    2023-03-14

    问题是Apache Flink是为分布式设计的。这意味着它需要能够远程运行你的代码。这意味着所有的处理函数都应该是可序列化的。在当前的实现中,即使您不会在任何分布式模式下运行流式处理,也可以在构建流式处理的早期确保这一点。这是一个折衷方案,它的一个明显好处是向您提供反馈,直到违反此合同的那一行(通过异常堆栈跟踪)。

    所以当你写作的时候

    val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
    

    你实际上写的是

    val counts = text.map(new Function1[String, Map[String, String]] {
        val capturedMapper = mapper
    
        override def apply(param: String) = capturedMapper.readValue(param, classOf[Map[String, String]])
    })
    

    这里重要的是您从外部上下文中捕获mapper并将其存储为必须可序列化的Function1对象的一部分。这意味着mapper必须是可序列化的。Jackson库的设计者认识到这种需求,并且由于映射器中没有任何基本上不可序列化的东西,因此他们使他们的ObjectMapper和默认的Modules可序列化。不幸的是,Scala Jackson模块的设计者错过了这一点,并通过使ScalaTypeModifier和所有子类不可序列化来使他们的DefaultScalaModule深度不可序列化。这就是为什么您的第二个代码有效而第一个代码无效:“原始”ObjectMapper是可序列化的,而ObjectMapper具有预注册的DefaultScalaModule不是。

    有几个可能的解决办法。最简单的方法可能是包装ObjectMapper

    object MapperWrapper extends java.io.Serializable {
      // this lazy is the important trick here
      // @transient adds some safety in current Scala (see also Update section)
      @transient lazy val mapper = {
        val mapper = new ObjectMapper
        mapper.registerModule(DefaultScalaModule)
        mapper
      }
    
      def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
    } 
    

    然后将其用作

    val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
    

    这个惰性技巧有效,因为尽管DefaultScalaModule的实例不可序列化,但创建DefaultScalaModule实例的函数是。

    更新:@瞬态怎么样?

    如果我加上lazy val@transient lazy val,这里有什么区别?

    这实际上是一个棘手的问题。lazy val的编译结果实际上是这样的:

    object MapperWrapper extends java.io.Serializable {
    
      // @transient is set or not set for both fields depending on its presence at "lazy val" 
      [@transient] private var mapperValue: ObjectMapper = null
      [@transient] @volatile private var mapperInitialized = false
    
      def mapper: ObjectMapper = {
        if (!mapperInitialized) {
          this.synchronized {
            val mapper = new ObjectMapper
            mapper.registerModule(DefaultScalaModule)
            mapperValue = mapper
            mapperInitialized = true
          }
        }
        mapperValue
      }
    
    
      def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
    }
    

    其中,lazy val上的@transient会影响两个备份字段。现在你可以看到为什么lazy val技巧有效了:

    >

  • 它在本地工作,因为它会延迟mapperValue字段的初始化,直到第一次访问mapper方法,因此该字段是安全的null执行序列化检查时

    它可以远程工作,因为MapperWrapper是完全可序列化的,lazy val应该如何初始化的逻辑被放入同一类的方法中(请参见def mapper)。

    然而,请注意,这种编译lazy val的行为是当前Scala编译器的一个实现细节,而不是Scala规范的一部分。如果在以后的某个时候,一个类类似于。NetLazy将被添加到Java标准库中,Scala编译器可能会开始生成不同的代码。这一点很重要,因为它为@transient提供了一种权衡。现在添加@transient的好处是,它可以确保这样的代码也能工作:

    val someJson:String = "..."
    val something:Something = MapperWrapper.readValue(someJson:String, ...)
    val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
    

    如果没有@transient,上面的代码将失败,因为我们强制初始化了lazybacking字段,现在它包含一个不可序列化的值。对于@transient,这不是问题,因为该字段根本不会被序列化。

    @transient的一个潜在缺点是,如果Scala更改了生成惰性val的代码的方式,并且该字段被标记为@transient,那么在远程工作场景中,它实际上可能不会反序列化。

    object还有一个技巧,因为对于objects,Scala编译器生成自定义反序列化逻辑(覆盖readResolve)以返回相同的单例对象。这意味着包含lazy val的对象并不是真正的反序列化对象,而是使用对象本身的值。这意味着在远程场景中,@transient lazy val内部的对象比内部的更能证明未来。

  •  类似资料:
    • 我有一个从CSV文件读取的: 据我所知,是一个POJO: 我还有一个简单的课程: 但当我尝试使用它时,例如从测试中: 我得到以下错误: 我读过这些问题和答案,但运气不好: 任务不可序列化Flink 无法在scala中序列化任务 任务不可序列化:java。伊奥。仅在类而非对象上调用闭包外函数时NotSerializableException

    • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

    • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

    • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

    • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是

    • 这给出的错误如下,任何帮助将是感激的: