我正在编写一个 Flink 转换器,我有一个具有以下属性的自定义对象直方图
:
case class Histogram(
nRows: Int,
nCols: Int,
min: Int,
step: Double,
private val countMatrix: Array[ArrayBuffer[Double]],
private val cutMatrixL1: Array[ArrayBuffer[Double]],
val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
extends Serializable {
???
}
这是我的FitOperation
:
implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
override def fit(
instance: PIDiscretizerTransformer,
fitParameters: ParameterMap,
input: DataSet[LabeledVector]): Unit = {
// get params...
val metric = input.map { x ⇒
// (instance, histrogram totalCount)
(x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
}.reduce { (m1, m2) ⇒
// Update Layer 1
val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)
// Update Layer 2 if neccesary
val updatedL2 = if (m1._3 % l2updateExamples == 0) {
updateL2(m1._1, updatedL1)
} else updatedL1
(m2._1, updatedL2, m1._3 + 1)
}.map(_._2)
// instance.metricsOption = Some(metric)
}
}
这样做很好,但是如果我取消最后一行的注释:< code > instance . metrics option = Some(metric)我会得到一个< code > Java . io . notserializableexception:org . Apache . flink . API . Scala . dataset
我如何在我的类直方图
中找到导致问题的对象?据我所知,<code>ArrayBuffer</code>是可序列化的,Map也是如此。尽管我发现了这个SO问题:
地图不能在scala中序列化?
上面写着<代码>。mapValues不可序列化,但我没有使用< code >。mapValues任意位置。
问题是您引用的是<code>实例。在MapFunction
中执行步骤实例
的类型为PIDiscretizerTransformer,无法序列化。因此,您需要在MapFunction
之外计算步骤,并将值传递到函数中。那么您的程序应该是可序列化的。
我正在使用Apache Flink对流数据执行分析。 我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。 如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。 目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerat
我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示: 将数据写入特定Kafka主题的方法如下: 我有另一种方法可以从Kafka主题获取对象的字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误: 主要代码: Java似乎试图序列化对象,而不仅仅是字段
我试图使用flink从kafka中读取数据,执行一些函数,并将结果返回到不同的kafka主题,但出现以下错误`组织。阿帕奇。Flink。应用程序编程接口。常见的InvalidProgrameException:MapFunction的实现不可序列化。对象可能包含或引用不可序列化的字段。 我收到了来自kafka的消息-对其进行了一些操作,并返回了一个对象列表,我想发送到不同的主题。 内部类也实现了可
问题内容: 我有三个类(,和),它们扩展了另一个类()。如何判断对象属于哪个子类?到目前为止,我有一个具有类名称的属性,但是我认为可以使用类似于javascript的typeof的运算符。(类似:) 问题答案: 您可以使用关键字。 但是请注意,需要使用它通常是不良设计的标志。通常,您应该在每个派生类中编写方法覆盖,以便您无需显式地检查是哪个类。
我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构 https://github.com/dieterichlawson/admm 该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。 我遵循同样的方案,但为了进行局部计算,我需要对每个训
Apache Ignite序列化/反序列化与字段反序列化的顺序有关。我需要在Ignite缓存中放置一个“b”实例,如下所示: 如果我运行以下代码: b1来自缓存映射值:null 问题是子字段在父字段之前被反序列化,因此当Ignite反序列化B时,它首先创建一个空的B对象(带有null“name”和“mapofb”),然后尝试反序列化mapofb。它创建Hashtable,然后反序列化它包含的每个对
我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化
我有来自不同Apache Kafka主题的4个输入数据流(JSON消息)的Apache Flink作业,而我只有一个对象XFilterFunction--它执行一些筛选。我写了一些数据管道逻辑(原始示例): 在作业中使用一个新对象XFilterFunction是好还是坏? 还是使用两个新对象XFilterFunction更好?(2个流->2个新筛选器对象)