在一个flink项目中,我使用一个case类click。
case class click( date: LocalDateTime, stbId:String, channelId :Int)
这个类填充了数据集,并且在日期为Java8java.time.localdatetime
的情况下可以很好地工作。在Java7环境中切换到org.joda(Version2.9)之后,对数据集中的click对象的调用不像以前那样执行。对click对象的date字段的某些函数的访问引发NullPointerExceptions
。这些函数的例子有gethorofday
toString
等。我能够确保click类的日期字段不为空。我怀疑joda时间库与kryo序列化的交互不是很好。参见joda DateTime格式导致spark RDD函数中的null指针错误或spark中的NPE。在Flink API中有org.apache.Flink.API.java.typeutils.runtime.kryo.Serializers和静态方法registerjodatime
。这似乎是相关的。我很用心地试着
import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)
这还不够。我这样做对吗?我如何使用java.typeutils.runtime.kryo?
使用的版本Flink:0.9.1.scala:2.10Joda.×2.9
跟进:下面是所建议的添加代码(感谢Fabian和Robert)
val env = ExecutionEnvironment.getExecutionEnvironment
//import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map()
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
尽管如此,我还是目睹了以下几点
Exception in thread "main" java.lang.NullPointerException
at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
at java.lang.String.valueOf(Unknown Source)
at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
at myflink.click.toString(Ingestor.scala:20)
...
Flink将Kryo用于它无法序列化的类型。localdatetime
就是这样一个类。
遗憾的是,Kryo也不能正确地序列化它,所以我们不得不告诉Kryo如何通过为这个类提供一个专门的序列化器来实现它。
de.javakaffee:kryo-serializers
添加为依赖项:<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.30</version>
</dependency>
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])
我希望这能有所帮助(我保留了以前的答案作为参考)
在Flink中调试KRYO/Serializer问题的一些一般性说明:
在本地执行作业时(也应该在./bin/flink
前端中工作,但输出可能在log/目录中),您应该看到如下内容:
14:05:52,863 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 15 registered types and 2 default Kryo serializers
14:05:52,943 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster.
14:05:53,150 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
使用调试日志级别(在log4j.properties
中将info
替换为DEBUG
),您实际上可以获得有关已注册序列化器的更详细信息:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?
我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消
问题内容: 我的一部分代码获得了OuterHTML属性 因此我可以进行涉及解析的内容。 虽然在Firefox上的javascript中没有OuterHTML属性,但我找不到其他方法来获取此字符串。有想法吗? 问题答案: 弄清楚了! getAttribute不起作用,但是getAttributeNode很好; D
问题内容: 你们能用EPOLLHUP为我提供一个很好的示例代码来处理死者吗?我知道这是检测到用户断开连接的信号,但不确定如何在代码中使用它。 问题答案: 您没有检测到同级关闭(这表明套接字意外关闭,即通常是内部错误)。 使用它非常简单,只需将标志与您要赋予的其他标志“或”即可。因此,例如,代替write 。 之后,如果另一端关闭了连接(可能要关闭套接字),请执行随后要执行的操作。 请注意,从套接字
我是Android新手,我正在尝试使用UI线程,所以我写了一个简单的测试活动。但我想我误解了什么,因为点击按钮,应用程序不再响应
问题内容: 如何在Python 3.6 x64中使用 TensorFlow GPU 版本而不是 CPU 版本? Python正在使用我的 CPU 进行计算。 我可以注意到它,因为我有一个错误: 您的CPU支持该TensorFlow二进制文件未编译为使用的指令:AVX2 我已经安装了tensorflow和tensorflow-gpu。 如何切换到GPU版本? 问题答案: 遵循本教程Tensorflo