当前位置: 首页 > 知识库问答 >
问题:

运行Spark流作业时出现序列化问题

燕扬
2023-03-14

无法解决以下由filtered.foreachpartition(iter=>{)触发的序列化问题。我认为foreachpartition可以解决序列化问题,但事实并非如此。那么,如何使用redispool

val redis_host = "localhost"
val redist_port = 6379
messages.foreachRDD(rdd => {
  rdd.foreachPartition(iter => {
    val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000))
    iter.foreach({ msg =>
      println(msg.mkString(","))
    })
  })
})

我假设变量redis_hostredis_port是不可序列化的,但是我如何正确地序列化它们,以便代码能够在集群上工作,而不仅仅是在本地工作呢?

上面显示的代码抛出错误:

共有1个答案

南宫博简
2023-03-14

解决方案是在匿名函数中懒洋洋地初始化池。在java中,可以执行以下操作:

messages.foreachRDD(new RedisFunction(redis_host, redis_port))
messages.count()

class RedisFunction implements F {
    private Pool pool = null;
    private final String redis_host;
    private final String redis_port;

    RedisFunction(String redis_host, String redis_port) {
        this.redis_host = redis_host;
        this.redis_port = redis_port;
        initPool();
    }
    private void initPool() {
        this.pool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000))
    }
    public Void call(JavaRDD<> rdd) {
        if(this.pool == null) {
            initPool();
        }
        rdd = rdd.map(....);  /*your rdd transformations go here*/
        rdd.count();   //spark action
    }
}

上面的java示例应该可以帮助您解决序列化问题。

 类似资料:
  • 问题内容: 编写以下Java程序来试验apache spark。 该程序尝试从相应的文件中读取正负列表,将其与主文件进行比较并相应地过滤结果。 执行spark作业时引发以下错误, 任何指针? 问题答案: 创建匿名类时,编译器会做一些事情: 它将被重写为: 这就是为什么您可以使用的原因,因为迭代器不可序列化。 为了避免这种情况,只需在提取出next的结果之前:

  • 我正在尝试连接spark streaming应用程序中的DB2数据库和导致“org.apache.spark.sparkException:Task not Serializable”问题的数据库查询执行语句。请指教。下面是我有的示例代码供参考。 下面是错误日志:

  • 问题内容: 我有一个包含三个字段的单一对象:两个字符串和一个 我想做的是将这些对象中的一个保存到文件中,但是我不断收到 我用来存储文件的代码: 未初始化drawable时,一切正常。在此先感谢您的帮助。 问题答案: java.io.NotSerializableException: android.graphics.drawable.BitmapDrawable 该消息看起来非常清晰- 字段中的特

  • 我在运行数据流作业时得到以下错误。我正试图将现有的beam版本更新到2.11.0,但在运行时出现了以下错误。 java.lang.incompatibleClassChangeError:类org.apache.beam.model.pipeline.v1.runnerAPI$standardpTransforms$Primitives没有实现在org.apache.beam.runners.co

  • 我试图运行火花作业,基本上加载数据在卡桑德拉表。但它也产生了以下错误。

  • 我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于进行处理: 有没有办法在不更改代码的情况下解决这个问题?