当前位置: 首页 > 知识库问答 >
问题:

Apache Flink中的不可序列化对象

卫仲卿
2023-03-14

我正在使用Apache Flink对流数据执行分析

我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。

如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。

目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerator),如果在接收器的平面图中使用它,它就会工作。

有没有办法创建一个不可序列化的依赖对象,它可能需要超过10秒才能在Flink的平面图或接收器中初始化?

public class DependencyWrap {

  static MGenerator mGenerator;

  static {
    final String configStr = "{}";
    final Config config = new Gson().fromJson(config, Config.class);
    mGenerator = new MGenerator(config);
  }

}
public class MyStreaming {

  public static void main(String[] args) throws Exception {

    Preconditions.checkNotNull(MGenerator.mGenerator);
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
    ...
    input.flatMap(new RichFlatMapFunction<Map<String,Object>,List<String>>() {

      @Override
      public void open(Configuration parameters) {
      }

      @Override
      public void flatMap(Map<String,Object> value, Collector<List<String>> out) throws Exception {

        out.collect(MFVGenerator.mfvGenerator.generateMyResult(value.f0, value.f1));
      }

    });

  }
}

另外,如果我对这个问题的理解有误,请纠正我。

共有2个答案

袁开宇
2023-03-14

使用打开通常是加载外部查找源的正确位置。超时有点奇怪,可能有一个配置围绕它。

但是,如果使用静态加载器(无论是静态类还是单例)的好处是,您只需要在同一个任务管理器上为任务的所有并行实例加载一次。因此,您可以节省内存和CPU时间。这对您来说尤其如此,因为您在两个不同的任务中使用相同的数据结构。此外,静态加载器可以在第一次使用时延迟初始化,以避免open中的超时。

这种方法的明显缺点是代码的可测试性受到影响。有一些方法可以解决这个问题,如果有兴趣,我可以扩展这些方法。

我看不到使用代理序列化器模式的好处。它不必要地复杂(Java中的自定义序列化),并且没有什么好处。

李俊雅
2023-03-14

在Open方法中执行此操作是100%正确的方法。Flink是为您提供超时异常还是对象?

作为最后一种方法,您可以将对象包装在一个包含对象及其JSON字符串或Config(Config可序列化吗?)的类中,对象标记为瞬态,然后覆盖ReadObject/WriteObject方法以调用构造函数。如果mGenerator对象本身是无状态的(如果不是,您将遇到其他问题),则在将作业分发给任务管理器时,序列化代码应该只调用一次。

 类似资料:
  • 我正在试验Stanford CoreNLP库,我想序列化主要的StanfordCoreNLP管道对象,尽管它抛出了一个java.io.NotSerializableException。 完整故事:每当我运行我的实现时,将管道注释器和分类器加载到内存中大约需要15秒。最终进程的内存约为600MB(很容易小到可以存储在我的机箱中)。我想在第一次创建管道后保存它,这样我就可以在以后将其读入内存。 然而,

  • 我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化

  • 我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构 https://github.com/dieterichlawson/admm 该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。 我遵循同样的方案,但为了进行局部计算,我需要对每个训

  • 问题内容: 我在android / java中对Location的子类进行序列化遇到了麻烦 位置不可序列化。我有一个名为FALocation的第一个子类,它没有任何实例变量。我已经宣布它可序列化。 然后,我有一个名为Waypoint的第二个类,看起来像这样: 序列化工作正常。 反序列化会产生跟随翼异常(腿对象包含一个航路点): 问题答案: 序列化位置绝对必要吗?也许您可以将其标记为瞬态,并在反序列

  • 问题内容: 当我尝试运行以下代码时: 我得到以下异常: 如何成功使用包含s的对象? 问题答案: 在序列化之前将集合变成列表,或使用自定义处理程序来这样做:

  • 我有一个看起来很常见的问题,但到目前为止,我还没有找到一个适合我的解决方案。我想我只是错过了一些小事情,但我已经崩溃了,请求帮助。我正在尝试使用flask和pymongo获得json输出。 以下是控制台中使用print(结果)的对象: 当我试图返回时,我得到了错误: TypeError: ObjectId类型的对象不是JSON可序列化的 类联系人(资源): 我试过bson。json_util建议。