当前位置: 首页 > 知识库问答 >
问题:

Flink如何使用-FasterXML/jackson数据格式文本-将CSV转换为POJO

卜阳
2023-03-14

我在课堂上收到了一个CSV,我需要获取用于装箱POJO的值。我不必在目录中打开“file.csv”,由Flink将逗号分隔的元素传递给EventDeserializationSchema,这个元素用于“Event Class”来处理每个事件。

以下是一个示例:

在:“‘亚当’、‘史密斯’,66,….‘12:01:00.000’”-

为此,我正在使用:https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

这是我的事件类,应该做到这一点,实际上目前什么都没有做。

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class Event implements Serializable {

    CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();
    
    CsvSchema schema = CsvSchema.emptySchema().withHeader();

    CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
    ObjectMapper mapper = new CsvMapper();
    mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
    
    return Pojo
}

这是我的Pojo课程:

public class Pojo {
    
        public String firstName;
        public String lastName;
        private int age;
        public String time;

        public Pojo(String firstName, String lastName, int age, String time) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
            this.time =time;
            
        }

}

如果能帮助全班同学归还Pojo,我们将不胜感激。

这是JSON:https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java的一个例子

单击evenclasshttps://html" target="_blank">github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaEvent.java#L27

共有1个答案

段干麒
2023-03-14

要使其工作,您需要为字段设置默认构造函数和getter/setters。我不明白您将在Event中做什么,以及为什么还有Pojo,但假设您想将传入的字符串反序列化为Event,这样的东西应该可以工作:

  1. 事件Pojo类:
public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}
public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}
 类似资料:
  • 问题内容: 对于某些要求,我想将 文本文件(定界) 转换为 ORC(优化行列) 格式。由于必须定期运行它,因此我想编写一个 Java程序 来执行此操作。我不想使用Hive临时表解决方法。有人可以帮我吗?以下是我尝试过的 运行此命令将显示以下错误,并在本地生成一个名为 part-00000 的文件 问题答案: 您可以使用Spark数据帧非常轻松地将定界文件转换为orc格式。您还可以指定/施加模式并过

  • 问题内容: 我尝试使用json格式的文件作为输入。这是示例数据的片段。 可以在r中使用这种复杂的json格式制作一个csv,以便更平滑地处理数据吗? 例如,有以下基本类别:基本信息照片创建者位置类别网址 可以制作带有basic_information.id,creator.id等子类别类别的csv文件吗? 问题答案: 在研究您的答案时,我在评论中张贴了一些链接,但现在我非常确信这是解决问题的方法。

  • 我需要将长数据格式(long)转换为宽格式(wide),条件如下(如果可能): 1) 所有数据文件都将是具有相同结构(id、名称、值)的长格式(long),但每个数据文件将具有不同的变量、值和变量数: 2) 每个数据文件将是不同的变量混合物(因子、整数、数字)。有些因素可能每个案例都有多个级别(从长远来看是水果和肉),我想为这些因素中的每个级别创建一个单独的虚拟变量(逻辑)。因子和数值变量的数量将

  • 问题内容: 我正在使用以下代码: 它在我的IDE中抛出错误,但为什么呢?我已经使用了一些示例中的代码。 我不知道要写入什么,因为我想转换任何CSV文件。 如何将任何CSV文件转换为JSON并将其保存到磁盘? 接下来做什么?例子 问题答案: 我认为,您应该使用它来解决问题。请参见以下示例: 有关更多信息和示例,请参见此页面:jackson-dataformat-csv。

  • 我对Spark SQL很陌生。在执行一项培训任务时,我遇到了以下问题,无法找到答案(以下所有示例都有点愚蠢,但出于演示目的,应该仍然可以)。 我的应用程序读取拼花文件并根据其内容创建数据集: 数据集。show()调用结果: 然后,我将数据集转换为一个新的数据集,其中包含Person类型: 哪里 最后,当我显示数据集的内容时,我希望看到 然而,我明白了 这是toString()方法的结果,而标头是正