当前位置: 首页 > 知识库问答 >
问题:

我如何在flink中使用joda.time(或者我如何使用typeutils.runtime.kryo)

燕刚捷
2023-03-14

在一个flink项目中,我使用一个case类click。

case class click( date: LocalDateTime, stbId:String, channelId :Int)

这个类填充了数据集,并且在日期为Java8java.time.localdatetime的情况下可以很好地工作。在Java7环境中切换到org.joda(Version2.9)之后,对数据集中的click对象的调用不像以前那样执行。对click对象的date字段的某些函数的访问引发NullPointerExceptions。这些函数的例子有gethorofdaytoString等。我能够确保click类的日期字段不为空。我怀疑joda时间库与kryo序列化的交互不是很好。参见joda DateTime格式导致spark RDD函数中的null指针错误或spark中的NPE。在Flink API中有org.apache.Flink.API.java.typeutils.runtime.kryo.Serializers和静态方法registerjodatime。这似乎是相关的。我很用心地试着

import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)

这还不够。我这样做对吗?我如何使用java.typeutils.runtime.kryo?

使用的版本Flink:0.9.1.scala:2.10Joda.×2.9

跟进:下面是所建议的添加代码(感谢Fabian和Robert)

val env = ExecutionEnvironment.getExecutionEnvironment
//import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
16:44:53,998 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE
16:44:54,545 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream                                        - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO  org.apache.flink.api.java.typeutils.TypeExtractor                 - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$                        - accessedFields: Map()
16:44:57,369 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE

尽管如此,我还是目睹了以下几点

Exception in thread "main" java.lang.NullPointerException
    at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
    at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
    at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
    at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
    at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
    at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
    at java.lang.String.valueOf(Unknown Source)
    at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
    at myflink.click.toString(Ingestor.scala:20)
    ...

共有1个答案

和柏
2023-03-14

Flink将Kryo用于它无法序列化的类型。localdatetime就是这样一个类。

遗憾的是,Kryo也不能正确地序列化它,所以我们不得不告诉Kryo如何通过为这个类提供一个专门的序列化器来实现它。

  1. de.javakaffee:kryo-serializers添加为依赖项:
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.30</version>
</dependency>
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])

我希望这能有所帮助(我保留了以前的答案作为参考)

在Flink中调试KRYO/Serializer问题的一些一般性说明:

在本地执行作业时(也应该在./bin/flink前端中工作,但输出可能在log/目录中),您应该看到如下内容:

14:05:52,863 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 15 registered types and 2 default Kryo serializers 
14:05:52,943 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster. 
14:05:53,150 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started

使用调试日志级别(在log4j.properties中将info替换为DEBUG),您实际上可以获得有关已注册序列化器的更详细信息:

14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
 类似资料:
  • 我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?

  • 我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消

  • 问题内容: 我的一部分代码获得了OuterHTML属性 因此我可以进行涉及解析的内容。 虽然在Firefox上的javascript中没有OuterHTML属性,但我找不到其他方法来获取此字符串。有想法吗? 问题答案: 弄清楚了! getAttribute不起作用,但是getAttributeNode很好; D

  • 问题内容: 你们能用EPOLLHUP为我提供一个很好的示例代码来处理死者吗?我知道这是检测到用户断开连接的信号,但不确定如何在代码中使用它。 问题答案: 您没有检测到同级关闭(这表明套接字意外关闭,即通常是内部错误)。 使用它非常简单,只需将标志与您要赋予的其他标志“或”即可。因此,例如,代替write 。 之后,如果另一端关闭了连接(可能要关闭套接字),请执行随后要执行的操作。 请注意,从套接字

  • 我是Android新手,我正在尝试使用UI线程,所以我写了一个简单的测试活动。但我想我误解了什么,因为点击按钮,应用程序不再响应

  • 问题内容: 如何在Python 3.6 x64中使用 TensorFlow GPU 版本而不是 CPU 版本? Python正在使用我的 CPU 进行计算。 我可以注意到它,因为我有一个错误: 您的CPU支持该TensorFlow二进制文件未编译为使用的指令:AVX2 我已经安装了tensorflow和tensorflow-gpu。 如何切换到GPU版本? 问题答案: 遵循本教程Tensorflo