我正在做一个Flink项目,想将源JSON字符串数据解析为Json Object。我正在使用jackson-module-scala进行JSON解析。但是,在Flink API中使用JSON解析器时遇到了一些问题(例如map
)。
这里有一些代码示例,我无法理解为什么它会这样。
在本例中,我正在执行jackson module scala的官方exmaple代码告诉我的操作:
>
ObjectMapper
DefaultScalaModule
<块引用> DefaultScalaModule
是一个Scala对象,包括对所有当前支持的Scala数据类型的支持。
我得到的错误是:org.apache.flink.api.common.InvalidProgram Exception:
任务不可序列化
。
object JsonProcessing {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("xxx")
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
// execute and print result
counts.print()
env.execute("JsonProcessing")
}
}
然后我在Google上搜索了一下,并提出了以下解决方案,其中将registerModule移动到map函数中。
val mapper = new ObjectMapper
val counts = text.map(l => {
mapper.registerModule(DefaultScalaModule)
mapper.readValue(l, classOf[Map[String, String]])
})
但是,我无法理解的是:为什么这将与外部定义的对象mapper
的调用方法一起工作?是因为ObjectMapper
本身是可序列化的,如这里所述ObjectMapper.java#L114?
现在,JSON解析工作正常,但是每次,我都必须调用mapper.register模块(DefaultScalaModule)
,我认为这可能会导致一些性能问题(是吗?)。我还尝试了另一种解决方案,如下所示。
我创建了一个新的case类Jsen
,并将其用作相应的解析类,注册Scala模块。而且它也工作正常。
然而,如果您的输入JSON经常变化,那么这就没有那么灵活了。管理类Jsen
是不可维护的。
case class Jsen(
@JsonProperty("a") a: String,
@JsonProperty("c") c: String,
@JsonProperty("e") e: String
)
object JsonProcessing {
def main(args: Array[String]) {
...
val mapper = new ObjectMapper
val counts = text.map(mapper.readValue(_, classOf[Jsen]))
...
}
此外,我还尝试使用JsonNode
,而不调用registerModule
,如下所示:
...
val mapper = new ObjectMapper
val counts = text.map(mapper.readValue(_, classOf[JsonNode]))
...
它也工作得很好。
我的主要问题是:实际上是什么导致了在寄存器模块(DefaultScalaModule)
的引擎盖下任务不可序列化的问题?
如何确定您的代码在编码过程中是否可能导致这种不可分解的问题?
问题是Apache Flink是为分布式设计的。这意味着它需要能够远程运行你的代码。这意味着所有的处理函数都应该是可序列化的。在当前的实现中,即使您不会在任何分布式模式下运行流式处理,也可以在构建流式处理的早期确保这一点。这是一个折衷方案,它的一个明显好处是向您提供反馈,直到违反此合同的那一行(通过异常堆栈跟踪)。
所以当你写作的时候
val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
你实际上写的是
val counts = text.map(new Function1[String, Map[String, String]] {
val capturedMapper = mapper
override def apply(param: String) = capturedMapper.readValue(param, classOf[Map[String, String]])
})
这里重要的是您从外部上下文中捕获mapper
并将其存储为必须可序列化的Function1
对象的一部分。这意味着mapper
必须是可序列化的。Jackson库的设计者认识到这种需求,并且由于映射器中没有任何基本上不可序列化的东西,因此他们使他们的ObjectMapper
和默认的Module
s可序列化。不幸的是,Scala Jackson模块的设计者错过了这一点,并通过使ScalaTypeModifier
和所有子类不可序列化来使他们的DefaultScalaModule
深度不可序列化。这就是为什么您的第二个代码有效而第一个代码无效:“原始”ObjectMapper
是可序列化的,而ObjectMapper
具有预注册的DefaultScalaModule
不是。
有几个可能的解决办法。最简单的方法可能是包装ObjectMapper
object MapperWrapper extends java.io.Serializable {
// this lazy is the important trick here
// @transient adds some safety in current Scala (see also Update section)
@transient lazy val mapper = {
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper
}
def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
然后将其用作
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
这个惰性
技巧有效,因为尽管DefaultScalaModule
的实例不可序列化,但创建DefaultScalaModule
实例的函数是。
更新:@瞬态怎么样?
如果我加上lazy val
和@transient lazy val
,这里有什么区别?
这实际上是一个棘手的问题。lazy val
的编译结果实际上是这样的:
object MapperWrapper extends java.io.Serializable {
// @transient is set or not set for both fields depending on its presence at "lazy val"
[@transient] private var mapperValue: ObjectMapper = null
[@transient] @volatile private var mapperInitialized = false
def mapper: ObjectMapper = {
if (!mapperInitialized) {
this.synchronized {
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
mapperValue = mapper
mapperInitialized = true
}
}
mapperValue
}
def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
其中,lazy val
上的@transient
会影响两个备份字段。现在你可以看到为什么lazy val
技巧有效了:
>
它在本地工作,因为它会延迟mapperValue
字段的初始化,直到第一次访问mapper
方法,因此该字段是安全的null
执行序列化检查时
它可以远程工作,因为MapperWrapper
是完全可序列化的,lazy val
应该如何初始化的逻辑被放入同一类的方法中(请参见def mapper
)。
然而,请注意,这种编译lazy val
的行为是当前Scala编译器的一个实现细节,而不是Scala规范的一部分。如果在以后的某个时候,一个类类似于。NetLazy
将被添加到Java标准库中,Scala编译器可能会开始生成不同的代码。这一点很重要,因为它为@transient
提供了一种权衡。现在添加@transient
的好处是,它可以确保这样的代码也能工作:
val someJson:String = "..."
val something:Something = MapperWrapper.readValue(someJson:String, ...)
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
如果没有@transient
,上面的代码将失败,因为我们强制初始化了lazy
backing字段,现在它包含一个不可序列化的值。对于@transient
,这不是问题,因为该字段根本不会被序列化。
@transient
的一个潜在缺点是,如果Scala更改了生成惰性val
的代码的方式,并且该字段被标记为@transient
,那么在远程工作场景中,它实际上可能不会反序列化。
object
还有一个技巧,因为对于object
s,Scala编译器生成自定义反序列化逻辑(覆盖readResolve
)以返回相同的单例对象。这意味着包含lazy val
的对象并不是真正的反序列化对象,而是使用对象本身的值。这意味着在远程场景中,
@transient lazy val
内部的对象
比内部的类
更能证明未来。
我有一个从CSV文件读取的: 据我所知,是一个POJO: 我还有一个简单的课程: 但当我尝试使用它时,例如从测试中: 我得到以下错误: 我读过这些问题和答案,但运气不好: 任务不可序列化Flink 无法在scala中序列化任务 任务不可序列化:java。伊奥。仅在类而非对象上调用闭包外函数时NotSerializableException
我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中
null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?
我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:
问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是
这给出的错误如下,任何帮助将是感激的: