我有一个从CSV文件读取的数据集
:
val dataSet = env.readCsvFile[ElecNormNew](
getClass.getResource("/elecNormNew.arff").getPath,
pojoFields = Array("date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer", "label")
据我所知,ElecNormNew
是一个POJO:
// elecNormNew POJO
class ElecNormNew(
var date: Double,
var day: Int,
var period: Double,
var nswprice: Double,
var nswdemand: Double,
var vicprice: Double,
var vicdemand: Double,
var transfer: Double,
var label: String) extends Serializable {
def this() = {
this(0, 0, 0, 0, 0, 0, 0, 0, "")
}
}
我还有一个简单的课程:
case class Discretizer[T](
data: DataSet[T],
nBins: Int = 5,
s: Int = 1000) {
private[this] val log = LoggerFactory.getLogger("Discretizer")
private[this] val V = Vector.tabulate(10)(_ => IntervalHeap(nBins, 1, 1, s))
private[this] def updateSamples(x: T): Vector[IntervalHeap] = {
log.warn(s"$x")
V
}
def discretize() = {
data map (x => updateSamples(x))
}
}
但当我尝试使用它时,例如从测试中:
val a = new Discretizer[ElecNormNew](dataSet)
a.discretize
我得到以下错误:
org.apache.flink.api.common.InvalidProgramException: Task not serializable
// ...
[info] at com.elbauldelprogramador.discretizers.IDADiscretizer.discretize(IDADiscretizer.scala:69)
// ...
[info] Cause: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
// ...
我读过这些问题和答案,但运气不好:
我想说你提到的第一个链接提供了答案:
问题在于,您从MapFunction中引用了数据集页面。这是不可能的,因为数据集只是数据流的逻辑表示,不能在运行时访问。
离散化
使用map
,因此这里也适用。
问题内容: 我需要通过scp复制战争文件。 我将添加到,但是仍然出现此错误: 原因:找不到org.apache.tools.ant.taskdefs.optional.ssh.Scp类。这看起来像是Ant的可选组件之一。 这是为了以防万一: http://gist.github.com/320859 问题答案: 从诊断顶部看,这看起来像您的问题: 可选任务:不可用 您的$ ANT_HOME / l
我正在做一个Flink项目,想将源JSON字符串数据解析为Json Object。我正在使用jackson-module-scala进行JSON解析。但是,在Flink API中使用JSON解析器时遇到了一些问题(例如)。 这里有一些代码示例,我无法理解为什么它会这样。 在本例中,我正在执行jackson module scala的官方exmaple代码告诉我的操作: > 创建一个新的 注册 <块
在我的Flink应用程序中,我使用java.time.instant表示UTC时间戳。应用程序运行良好,但我最近在Flink日志中注意到这条消息: “类class java.time.instant不能用作POJO类型,因为并非所有字段都是有效的POJO字段,必须作为GenericType处理。有关对性能影响的详细信息,请阅读Flink文档中的”数据类型&序列化“。”
在我的程序中,我有一个返回一些RDD的方法,我们称它为,它接受一个不可序列化的参数,并让RDD的类型为(我真正的RDD是元组类型,但只包含基元类型)。 当我尝试这样的事情时: 我得到的。 当我用替换(即某个常数)时,它会运行。 从序列化跟踪中,它试图序列化,并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在RDD中。 当我试图直接收集的输出时,即 我也没有问题。 该方法使用获取(本地)值序列
我一直在使用Phonegap开发一个Android应用程序,现在我想让它在应用程序关闭时仍然可以在应用程序中执行java/js代码。所以我知道我需要创建一个服务。如果我在phonegap上创建服务插件,我还能执行javascript代码还是只执行java? 有人做过这样的事情吗?我发现了这个讨论,但似乎没有用:http://groups.google.com/group/phonegap/brow