我正在使用Apache Flink对流数据执行分析。
我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。
如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。
目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerator),如果在接收器的平面图中使用它,它就会工作。
有没有办法创建一个不可序列化的依赖对象,它可能需要超过10秒才能在Flink的平面图或接收器中初始化?
public class DependencyWrap {
static MGenerator mGenerator;
static {
final String configStr = "{}";
final Config config = new Gson().fromJson(config, Config.class);
mGenerator = new MGenerator(config);
}
}
public class MyStreaming {
public static void main(String[] args) throws Exception {
Preconditions.checkNotNull(MGenerator.mGenerator);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
...
input.flatMap(new RichFlatMapFunction<Map<String,Object>,List<String>>() {
@Override
public void open(Configuration parameters) {
}
@Override
public void flatMap(Map<String,Object> value, Collector<List<String>> out) throws Exception {
out.collect(MFVGenerator.mfvGenerator.generateMyResult(value.f0, value.f1));
}
});
}
}
另外,如果我对这个问题的理解有误,请纠正我。
使用打开通常是加载外部查找源的正确位置。超时有点奇怪,可能有一个配置围绕它。
但是,如果使用静态加载器(无论是静态类还是单例)的好处是,您只需要在同一个任务管理器上为任务的所有并行实例加载一次。因此,您可以节省内存和CPU时间。这对您来说尤其如此,因为您在两个不同的任务中使用相同的数据结构。此外,静态加载器可以在第一次使用时延迟初始化,以避免open
中的超时。
这种方法的明显缺点是代码的可测试性受到影响。有一些方法可以解决这个问题,如果有兴趣,我可以扩展这些方法。
我看不到使用代理序列化器模式的好处。它不必要地复杂(Java中的自定义序列化),并且没有什么好处。
在Open方法中执行此操作是100%正确的方法。Flink是为您提供超时异常还是对象?
作为最后一种方法,您可以将对象包装在一个包含对象及其JSON字符串或Config(Config可序列化吗?)的类中,对象标记为瞬态,然后覆盖ReadObject/WriteObject方法以调用构造函数。如果mGenerator对象本身是无状态的(如果不是,您将遇到其他问题),则在将作业分发给任务管理器时,序列化代码应该只调用一次。
我正在试验Stanford CoreNLP库,我想序列化主要的StanfordCoreNLP管道对象,尽管它抛出了一个java.io.NotSerializableException。 完整故事:每当我运行我的实现时,将管道注释器和分类器加载到内存中大约需要15秒。最终进程的内存约为600MB(很容易小到可以存储在我的机箱中)。我想在第一次创建管道后保存它,这样我就可以在以后将其读入内存。 然而,
我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化
我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构 https://github.com/dieterichlawson/admm 该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。 我遵循同样的方案,但为了进行局部计算,我需要对每个训
问题内容: 我在android / java中对Location的子类进行序列化遇到了麻烦 位置不可序列化。我有一个名为FALocation的第一个子类,它没有任何实例变量。我已经宣布它可序列化。 然后,我有一个名为Waypoint的第二个类,看起来像这样: 序列化工作正常。 反序列化会产生跟随翼异常(腿对象包含一个航路点): 问题答案: 序列化位置绝对必要吗?也许您可以将其标记为瞬态,并在反序列
问题内容: 当我尝试运行以下代码时: 我得到以下异常: 如何成功使用包含s的对象? 问题答案: 在序列化之前将集合变成列表,或使用自定义处理程序来这样做:
我有一个看起来很常见的问题,但到目前为止,我还没有找到一个适合我的解决方案。我想我只是错过了一些小事情,但我已经崩溃了,请求帮助。我正在尝试使用flask和pymongo获得json输出。 以下是控制台中使用print(结果)的对象: 当我试图返回时,我得到了错误: TypeError: ObjectId类型的对象不是JSON可序列化的 类联系人(资源): 我试过bson。json_util建议。