当前位置: 首页 > 知识库问答 >
问题:

java.util.List和java.util.Map的Flink序列化

姚实
2023-03-14

我的Flink管道目前使用一个Pojo,它包含一些列表和(字符串的)映射,如下所示

public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    ...

共有1个答案

裴浩歌
2023-03-14

Marius已经很好地解释了原因,尽管我不明白为什么Flink不支持您的用例。不过,我将添加现在有效的解决方案。

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
    ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
    ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));

// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");

final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);

DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));

注意,测试代码中可怕的访问模式只是为了快速而肮脏的演示。

 类似资料:
  • 我试图使用flink从kafka中读取数据,执行一些函数,并将结果返回到不同的kafka主题,但出现以下错误`组织。阿帕奇。Flink。应用程序编程接口。常见的InvalidProgrameException:MapFunction的实现不可序列化。对象可能包含或引用不可序列化的字段。 我收到了来自kafka的消息-对其进行了一些操作,并返回了一个对象列表,我想发送到不同的主题。 内部类也实现了可

  • 下面有一个JSON,我试图使用Jackson将其解析为POJO 我的响应POJO片段如下 以及他们的吸气手和二传手 当我得到“org.codehaus.jackson.map.jsonMappingException:Can not deserialize instance of java.util.list out of START_OBJECT token”时,我的声明是否错误? 解析逻辑如下

  • Map是一个java集合,它以键值对的形式存储元素,并且不允许列表中的重复元素。 Map接口提供三个集合视图,允许将地图的内容视为一组键,值集合或键值映射集。 Map与映射表中的元素映射,并且可以使用java.util.HashMap初始化无序映射。 定义RDBMS表 (Define RDBMS Tables) 考虑一种情况,我们需要将员工记录存储在EMPLOYEE表中,该表具有以下结构 - cr

  • List是一个java集合,它按顺序存储元素并允许重复元素。 该界面的用户可以精确控制列表中的每个元素的插入位置。 用户可以通过整数索引访问元素,并搜索列表中的元素。 更正式地,列表通常允许元素对e1和e2成为e1.equals(e2),并且如果它们根本允许空元素,则它们通常允许多个空元素。 List与映射表中的元素映射,并使用java.util.ArrayList初始化。 定义RDBMS表 (D

  • 问题内容: 在我的servlet中,我有以下代码: 使用以下代码从ajax返回响应: 当我运行代码时,我仍然收到以下错误(来自ajax): 我还尝试在没有ajax的同一java页面中显示序列化对象,如下所示: 我收到此错误: 请帮忙! 编辑 : 此问题的最终解决方案 问题答案: 由于无限递归,发生 Exception 。您可能需要彻底检查代码,以防任何意外的递归。

  • 问题内容: 是否有方法或任何util在两个地图上执行交集?(通过“键”将两个地图相交) 我找不到任何东西。我总是可以实现自己的交集逻辑,但是我希望其中一个类中已经有一些操作可以做到这一点。 问题答案: 怎么样: 要么: