当前位置: 首页 > 知识库问答 >
问题:

MapFunction的实现不是可序列化的Flink

樊琦
2023-03-14

对于初学者,我希望将所有输入数据流转换为KeyedStreams。因此,我将输入数据流映射为一个元组,然后应用KeyBy将其转换为KeyStream。

我总是遇到序列化的问题,我试着按照本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html操作,但没有成功。

我想知道的是:

    null
public class CEP {

private  Integer streamsIdComp = 0;
final  private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final  private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    return Tuple2.of(streamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}

public <T1> void addInputStream(DataStream<T1> inputStream) {

    TypeInformation<T1> streamType = inputStream.getType();

    dataStreamsTypes.put(streamsIdComp, streamType);
    dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
    streamsIdComp++;
}
}

测试类

public class CEPTest {

@Test
public void addInputStreamTest() throws Exception {
    //test if we can change keys in a keyedStream
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Record> input1 = env.fromElements(
            new Record("1", 1, "a"),
            new Record("2", 2, "b"),
            new Record("3", 3, "c"))
            .keyBy(Record::getBizName);

    DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);

    CEP cepObject = new CEP();
    cepObject.addInputStream(input1);
    cepObject.addInputStream(input2);

   }
}

错误信息

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a 
common reason for non-serializability. A common solution is to make the function a proper 
(non-inner) class, or a static inner class.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more

共有1个答案

云星波
2023-03-14

Flink是一个分布式框架。这意味着,您的程序可能会在数千个节点上运行。这也意味着每个工作节点必须接收要与所需上下文一起执行的代码。简化一点,流经系统的事件和要执行的函数都必须是可串行化的--因为它们是通过有线传输的。这就是为什么序列化通常在分布式编程中很重要的原因。

简而言之,序列化就是将数据编码成字节表示的过程,这些字节表示可以在另一个节点(另一个JVM)上传输和恢复。

回到问题上来。这是你的原因:

Caused by: java.io.NotSerializableException: CEP
return Tuple2.of(streamsIdComp, value);
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();
    // note this variable is local
    int localStreamsIdComp = streamsIdComp;

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    // and is used here
                    return Tuple2.of(localStreamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}
 类似资料:
  • 我有一个没有扩展可串行化或可外部化接口的类,但在netbeans中使用FindBugs时仍然会遇到错误。有人能告诉我如何解决这个问题吗? 这是我的课 FindBug中的Bug:类分析器定义了一个非瞬时的非序列化实例字段objAnalyzerVar,对于objAnalyzerDataTypeInfo也是如此;

  • 如果一封邮件被发送到我的收件箱,我会收到一条消息,并将内容插入数据库。我有一个组织。springframework。整合。果心信息如下: 现在,如果出现故障,我希望有故障安全恢复机制,我想的是将消息对象序列化到一个文件中,然后反序列化并更新到DB。 问题1。在这种情况下,如何序列化消息对象?2。除了序列化,还可以使用其他机制吗? 编辑我以前没有做过序列化,我听说类应该实现Serializable,

  • 问题内容: 我在android / java中对Location的子类进行序列化遇到了麻烦 位置不可序列化。我有一个名为FALocation的第一个子类,它没有任何实例变量。我已经宣布它可序列化。 然后,我有一个名为Waypoint的第二个类,看起来像这样: 序列化工作正常。 反序列化会产生跟随翼异常(腿对象包含一个航路点): 问题答案: 序列化位置绝对必要吗?也许您可以将其标记为瞬态,并在反序列

  • 我正在学习Python和aws。 我想要的是从JSON响应中提取值。此代码适用于不包含日期值的JSON响应,但在本例中,响应包含日期值。 这是我的密码: 我得到了这个错误: 有人知道解决办法吗?thx

  • 考虑以下代码: 现在扩展了一个实现接口的类。类和是带有一堆getter和setter的POJOS。FindBugs抱怨和字段说: 这个Serializable类定义了一个非基元实例字段,它既不是瞬态的、Serializable的,也不java.lang.Object的,并且似乎没有实现Externalizable接口或readObject()和WriteObject()方法。 好吧,所以一切都很好

  • 如果类B扩展了类A,则类B实现可序列化,而类A有一个不可序列化的公共非静态初始化变量。。。尝试使用FileOutputStream的writeObject()方法写入类“B”对象时,将序列化从a继承的不可序列化成员,以便将其与类B的其余变量一起写入文件,还是将引发NotSerializableException?我试过了,效果不错,但我不知道为什么。。。所以我不确定它是否总是有效,或者我错过了什么