当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka生产者:类的对象不可序列化

乌骏
2023-03-14

我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示:

public class StreamData implements Serializable {
    private transient StreamExecutionEnvironment env;
    private DataStream<byte[]> data ;
    ...

将数据写入特定Kafka主题的方法如下:

public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
                "localhost:9092",
                id,
                new KeyedSerializationSchema<byte[]>() {
                    @Override
                    public byte[] serializeKey(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public byte[] serializeValue(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public String getTargetTopic(byte[] bytes) {
                        return null;
                    }
                });               
        data.addSink(producer);
    }

我有另一种方法可以从Kafka主题获取对象的data字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误:

org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable

主要代码:

StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");

Java似乎试图序列化对象,而不仅仅是字段数据!使用Flink Kafka连接器向Kafka生成数据的代码已经过测试,并且可以正常使用(我的意思是不使用类,而是在main中编写所有代码)

如何消除错误?


共有3个答案

古起运
2023-03-14

使数据属性静态化,解决了问题。有人能详细说明这一点吗?这是一个好的解决方案吗?

private static DataStream<byte[]> data ;
郏瀚
2023-03-14

您也可以像这样在Flink中进行序列化

dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new 
                                             CustomSerializerSchema(),properties));
  public class CustomSerializerSchema implements SerializationSchema<MyUser> {

    private static final long serialVersionUID = 1L;

    @Override
    public byte[] serialize(MyUser element) {
        byte[] b = null;
         try {
             b= new ObjectMapper().writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return b; 
    }
}
倪阳飇
2023-03-14

我认为问题的原因是您的代码正在执行以下操作:

new KeyedSerializationSchema<byte[]>() {...}

这段代码所做的是创建一个匿名的KeyedSeriazationSchema子类,作为定义类(StreamData)的内部类。每个内部类都持有对外部类实例的隐式引用,因此使用默认Java序列化规则序列化它也会传递地尝试序列化外部对象(StreamData)。解决此问题的最好方法是将您的KeyedSeriazationSchema子类声明为:

  • 新的顶级课程或

我认为最后一种方法是这样的:

public class StreamData {
    static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
        ...
    };
    ...
    public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);               
        data.addSink(producer);
    }
}
 类似资料:
  • 我有一个看起来很常见的问题,但到目前为止,我还没有找到一个适合我的解决方案。我想我只是错过了一些小事情,但我已经崩溃了,请求帮助。我正在尝试使用flask和pymongo获得json输出。 以下是控制台中使用print(结果)的对象: 当我试图返回时,我得到了错误: TypeError: ObjectId类型的对象不是JSON可序列化的 类联系人(资源): 我试过bson。json_util建议。

  • 我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构 https://github.com/dieterichlawson/admm 该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。 我遵循同样的方案,但为了进行局部计算,我需要对每个训

  • 我正在尝试将文件从mongoDB读取到本地。 我的代码如下:STRING=“myLocalPath”PATH=STRING.json 我得到错误-对象类型'ObjectId'是不是JSON序列化...请建议。

  • 我有一个api,我正在尝试使用Flask Pymongo存储/发布一个用户对象。 但是,我得到了以下错误 文件“/home/kay/.local/share/virtualenvs/server-iT4jZt3h/lib/python3.7/site-packages/flask/json/i│ 尼特。py”,第321行,在jsonify中 │ 转储(数据,缩进=缩进,分隔符=分隔符)“\n”,│

  • 我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化

  • 我正在试验Stanford CoreNLP库,我想序列化主要的StanfordCoreNLP管道对象,尽管它抛出了一个java.io.NotSerializableException。 完整故事:每当我运行我的实现时,将管道注释器和分类器加载到内存中大约需要15秒。最终进程的内存约为600MB(很容易小到可以存储在我的机箱中)。我想在第一次创建管道后保存它,这样我就可以在以后将其读入内存。 然而,