无法解决以下由filtered.foreachpartition(iter=>{
)触发的序列化问题。我认为foreachpartition
可以解决序列化问题,但事实并非如此。那么,如何使用redispool
?
val redis_host = "localhost"
val redist_port = 6379
messages.foreachRDD(rdd => {
rdd.foreachPartition(iter => {
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000))
iter.foreach({ msg =>
println(msg.mkString(","))
})
})
})
我假设变量redis_host
和redis_port
是不可序列化的,但是我如何正确地序列化它们,以便代码能够在集群上工作,而不仅仅是在本地工作呢?
上面显示的代码抛出错误:
解决方案是在匿名函数中懒洋洋地初始化池。在java中,可以执行以下操作:
messages.foreachRDD(new RedisFunction(redis_host, redis_port))
messages.count()
class RedisFunction implements F {
private Pool pool = null;
private final String redis_host;
private final String redis_port;
RedisFunction(String redis_host, String redis_port) {
this.redis_host = redis_host;
this.redis_port = redis_port;
initPool();
}
private void initPool() {
this.pool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000))
}
public Void call(JavaRDD<> rdd) {
if(this.pool == null) {
initPool();
}
rdd = rdd.map(....); /*your rdd transformations go here*/
rdd.count(); //spark action
}
}
上面的java示例应该可以帮助您解决序列化问题。
问题内容: 编写以下Java程序来试验apache spark。 该程序尝试从相应的文件中读取正负列表,将其与主文件进行比较并相应地过滤结果。 执行spark作业时引发以下错误, 任何指针? 问题答案: 创建匿名类时,编译器会做一些事情: 它将被重写为: 这就是为什么您可以使用的原因,因为迭代器不可序列化。 为了避免这种情况,只需在提取出next的结果之前:
我正在尝试连接spark streaming应用程序中的DB2数据库和导致“org.apache.spark.sparkException:Task not Serializable”问题的数据库查询执行语句。请指教。下面是我有的示例代码供参考。 下面是错误日志:
问题内容: 我有一个包含三个字段的单一对象:两个字符串和一个 我想做的是将这些对象中的一个保存到文件中,但是我不断收到 我用来存储文件的代码: 未初始化drawable时,一切正常。在此先感谢您的帮助。 问题答案: java.io.NotSerializableException: android.graphics.drawable.BitmapDrawable 该消息看起来非常清晰- 字段中的特
我在运行数据流作业时得到以下错误。我正试图将现有的beam版本更新到2.11.0,但在运行时出现了以下错误。 java.lang.incompatibleClassChangeError:类org.apache.beam.model.pipeline.v1.runnerAPI$standardpTransforms$Primitives没有实现在org.apache.beam.runners.co
我试图运行火花作业,基本上加载数据在卡桑德拉表。但它也产生了以下错误。
我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于进行处理: 有没有办法在不更改代码的情况下解决这个问题?