当前位置: 首页 > 面试题库 >

Azure DataBricks Stream foreach失败,并显示NotSerializableException

钮高朗
2023-03-14
问题内容

我想不断详细说明数据集流的行(最初由Kafka发起):基于条件,我想更新Radis哈希。这是我的代码段(lastContacts是上一个命令的结果,该命令是这种类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]。扩展为org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]):

class MyStreamProcessor extends ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

val query = lastContacts
  .writeStream
  .foreach(new MyStreamProcessor())
  .start()

query.awaitTermination()

我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

谁能解释为什么发生此异常以及如何避免?谢谢!


问题答案:

Spark上下文不可序列化。

ForeachWriter的任何实现都必须可序列化,因为每个任务都将获得所提供对象的新的序列化反序列化副本。因此,强烈建议在调用open(…)方法之后执行任何用于写入数据的初始化操作(例如,打开连接或启动事务),这表明任务已准备好生成数据。

在您的代码中,您尝试在流程方法中使用spark上下文,

override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    *sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
  }

要将数据发送到Redis,您需要创建自己的连接并以open方法打开它,然后在process方法中使用它。

看一下如何创建Redis连接池。https://github.com/RedisLabs/spark-
redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala



 类似资料:
  • 但是得到一个错误: Py4JJavaError:调用o41时出错。显示字符串:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段1.0中的任务0失败1次,最近一次失败:阶段1.0中的任务0.0丢失(TID 1)(10.75.81.111执行器驱动程序):org。阿帕奇。火花SparkException:Python worker无法连接回。

  • 问题内容: 我在使用Python 2.7(Anaconda)的OSX El Capitan。启动命令将产生: 这些都不起作用(退出时出现相同的错误)。 已是最新。我基本上尝试遵循相关问题中的所有建议,但没有成功。任何帮助表示赞赏,谢谢! 问题答案: 从此博客复制。 通过查找可能得出结论的信息,即brew安装的配置可能不正确,请打开脚本以修改其中的某些内容: 变成: 保存 然后重新安装mysql-p

  • 我有一个多项目gradle构建,Spring Boot按照默认gradle约定进行结构化。 我目前的项目是(A)将gradle从5. x升级到7.3. x,(B)使用带有Spring Boot的嵌入式Tomcat。 这是一个已经存在很多年的项目,是Spring Boot,但是传统上一直作为一个战争文件部署在Tomcat中。 我已经按照gradle迁移指南将gradle升级到7.3.3,并正确构建了

  • 问题内容: 为什么: 产生错误? 如果它刚返回就不会更合逻辑吗? 问题答案: 由于空字符串不是有效的JSON,因此返回不正确,因为它是有效的JSON。例如 返回。无效的JSON也被解析为null将是一个错误。 空字符串不是有效的JSON,两个引号是有效的JSON。这是一个重要的区别。 也就是说,包含两个引号的字符串与空字符串不同。 将正确解析(返回一个空字符串)。但 将不会。 有效的最小JSON字

  • 问题内容: 我正在使用React Native开发一个简单的应用程序。我正在Android设备上对其进行测试。我创建了一个Node.js服务器来监听请求,该服务器运行在http:// localhost:3333 /上 。接下来,我要从index.android.js进行提取请求。下面是代码。 节点服务器上请求处理程序的代码如下 但是,获取请求无效。我在Chrome控制台中收到的错误是:TypeE

  • 问题内容: 我需要更换 我用了 但它引发异常 java.lang.IllegalArgumentException:非法的组引用 问题答案: 使用第二个参数: 正则表达式的替换参数中的is组符号 所以你需要逃脱