当前位置: 首页 > 知识库问答 >
问题:

序列化flink中kafka的对象

沈子实
2023-03-14

我试图使用flink从kafka中读取数据,执行一些函数,并将结果返回到不同的kafka主题,但出现以下错误`组织。阿帕奇。Flink。应用程序编程接口。常见的InvalidProgrameException:MapFunction的实现不可序列化。对象可能包含或引用不可序列化的字段。

我收到了来自kafka的消息-对其进行了一些操作,并返回了一个对象列表,我想发送到不同的主题。

class Wrapper implements Serializable{
        @JsonProperty("viewBuilderRequests")
        private ArrayList<ViewBuilderRequest> viewBuilderRequests;

        public Wrapper(){}

        public Wrapper(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
            this.viewBuilderRequests = viewBuilderRequests;
        }

        public List<ViewBuilderRequest> getViewBuilderRequests() {
            return viewBuilderRequests;
        }

        public void setViewBuilderRequests(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
            this.viewBuilderRequests = viewBuilderRequests;
        }
    }



public class ViewBuilderRequest implements Serializable {
    private CdmId cdmId
    private ViewBuilderOperation operation
    private List<ViewUserSystemIdentifier> viewUserSystemIdentifiers
    public ViewBuilderRequest(){
}

    public CdmId getCdmId() {
        return cdmId;
    }

    public void setCdmId(CdmId cdmId) {
        this.cdmId = cdmId;
    }

    public ViewBuilderOperation getOperation() {
        return operation;
    }

    public void setOperation(ViewBuilderOperation operation) {
        this.operation = operation;
    }

    public List<ViewUserSystemIdentifier> getViewUserSystemIdentifiers() {
        return viewUserSystemIdentifiers;
    }

    public void setViewUserSystemIdentifiers(List<ViewUserSystemIdentifier> viewUserSystemIdentifiers) {
        this.viewUserSystemIdentifiers = viewUserSystemIdentifiers;
    }

    public enum ViewBuilderOperation implements Serializable{
        Create, Update,Delete
    }




private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
    UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
    Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
    return wrapper;
};

内部类也实现了可序列化

此代码引发异常:

dataStream。映射(parseAndSendToGraphProcessing)。addSink(新Flinkkapkaproducer

我还对这两个对象进行了反序列化。

public class WrapperSchema implements DeserializationSchema<Wrapper>, SerializationSchema<Wrapper> {
//        private final static ObjectMapper objectMapper = new ObjectMapper().configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);

    static ObjectMapper objectMapper = new ObjectMapper();

    @Override
        public Wrapper deserialize(byte[] message) throws IOException {
            return objectMapper.readValue(message, Wrapper.class);
        }

        @Override
        public boolean isEndOfStream(Wrapper nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(Wrapper element) {
//            return element.toString().getBytes();
            if(objectMapper == null) {
                objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
                objectMapper = new ObjectMapper();
            }
            try {
                String json = objectMapper.writeValueAsString(element);
                return json.getBytes();
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }

            return new byte[0];
        }

        @Override
        public TypeInformation<Wrapper> getProducedType() {
            return TypeInformation.of(Wrapper.class);
        }
}

共有1个答案

祝宾白
2023-03-14

要使flink工作,您的消息和地图函数都必须是可序列化的。

据我所知,你的消息看起来是可序列化的。

但是你的map函数不是。有时候很难让lambda序列化。我认为在您的案例中,问题在于parseAndSendToGraphProcessing使用的是objectMapperjanusGraphDataProcessing,它们必须是可序列化的。

我的猜测是janusGraphDataProcessing是不可序列化的(如果您使用的是jackson 2.1或更高版本,则OjbectMapper是)。

如果是这种情况,那么一种解决方法是编写一个自定义RichMapFunction类,该类将把janusGraphDataProcessing存储为瞬态变量,并在其open函数中对其进行初始化。

private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
    UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
    Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
    return wrapper;
};
 类似资料:
  • 我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示: 将数据写入特定Kafka主题的方法如下: 我有另一种方法可以从Kafka主题获取对象的字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误: 主要代码: Java似乎试图序列化对象,而不仅仅是字段

  • 我正在使用Apache Flink对流数据执行分析。 我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。 如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。 目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerat

  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:

  • 我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。 生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。 我们看到的文章很少,建议是这样做: 我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!

  • 我的Flink管道目前使用一个Pojo,它包含一些列表和(字符串的)映射,如下所示

  • 我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一