当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-如何实现自定义反序列化器实现DeserializationSchema

苍宜修
2023-03-14

我用的是Flink我用的是Kafka连接器。我从flink收到的消息是一个逗号分隔的项目列表。“'a','b','c',1,0.1...'12:01:00.000'”其中一个包含事件时间,我想将此事件时间用于每个分区的水印(在kafka源代码中),然后将此事件时间用于会话窗口化。我的情况与通常情况有点不同,因为根据我的理解,人们通常使用“Kafka时间戳”和SimpleStringSchema()。在我的例子中,我不得不编写自己的反序列化器,实现DeserializationSchema并返回元组或POJO。所以基本上用我自己的函数替换SimpleStringSchema()。Flink提供了一些开箱即用的反序列化程序,但我真的不知道如何创建一个自定义的反序列化逻辑。

查看flink网站我发现:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/clickeventdeserializationschema.java

我真的需要一个例子,我可以如何做一个列表。上面提到的是JSON,所以给了我理论和概念,但我被困在那里了。

共有1个答案

阙奇思
2023-03-14

您应该像这样介绍POJO

public class Event implements Serializable {
    ...
    private Long timestamp;
}

并实现类似于链接中的简单反序列化器--您可以通过逗号手动拆分消息字符串来解析行,或者使用开箱即用的csv阅读器(如opencsv)将行解析为POJO:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        String line = new String(message, StandardCharsets.UTF_8);
        String[] parts = line.split(",");
        
        Event event = new Event();
        // TODO: parts to event here
        return event;
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}
 类似资料:
  • 本文向大家介绍Java使用@JsonDeserialize注解实现自定义反序列化器,包括了Java使用@JsonDeserialize注解实现自定义反序列化器的使用技巧和注意事项,需要的朋友参考一下 @JsonDeserialize注解用于在将JSON反序列化为Java对象时声明自定义反序列化器。我们可以通过使用泛型类型Employee 继承 StdDeserializer 类来实现自定义反序列化

  • 我有一个JSON作为字符串,我将其反序列化并实例化为scala的MyPOJO case类。我的数据是YYYY-MM-DD格式,但POJO createdBy中的实际属性是LocalDateTime。 如何在实例化Pojo时指定默认时间值2020-03-02 00:00:00, 序列化应返回yyyy-mm-dd格式。我的序列化和反序列化格式不同。 我尝试过自定义序列化和反序列化,如下所示,但由于缺少

  • 我需要实现我的自定义DefaultComboxModel。这样做的原因是每次我打电话给 或者 或者 我看到它自动触发一个项目状态更改事件。这会导致一些随机项目自动从列表中选择。这不是我想要的,因为它用随机选择的项目填充可编辑的JTextField。 这是我在使用我的自定义Itemlistener中的Thread.dumpStack()进行调试时看到的stacktrace,它是我在调用上述方法时看到

  • 本文向大家介绍如何在Java中使用Gson实现自定义JSON序列化?,包括了如何在Java中使用Gson实现自定义JSON序列化?的使用技巧和注意事项,需要的朋友参考一下 如果我们需要一种将 java对象转换为JSON的方法,则Gson库提供了一种通过向GsonBuilder 注册自定义序列化器来指定自定义序列化器的方法。我们可以通过覆盖com.google.gson.JsonSerializer

  • 本文向大家介绍.net实现序列化与反序列化实例解析,包括了.net实现序列化与反序列化实例解析的使用技巧和注意事项,需要的朋友参考一下 序列化与反序列化是.net程序设计中常见的应用,本文即以实例展示了.net实现序列化与反序列化的方法。具体如下: 一般来说,.net中的序列化其实就是将一个对象的所有相关的数据保存为一个二进制文件(注意:是一个对象) 而且与这个对象相关的所有类型都必须是可序列化的

  • 我有一个JestClient(elasticsearch)响应,我试图将其反序列化为一个对象。该对象包含两个DateTime字段,而在响应中,它们是字符串,因此我得到: 所以,我创建了一个自定义反序列化器来解决这个问题…然而,无论我做什么,我总是得到同样的错误。不知何故它没有注册使用它? 最后,我试图解析JestClient响应的代码: 无论我尝试什么,我总是得到上面的错误,我甚至不确定在这一点上