当前位置: 首页 > 知识库问答 >
问题:

在spark结构化流媒体中从kafka/json数据源写入损坏的数据

孔逸春
2023-03-14

在火花批处理作业中,我通常将JSON数据源写入文件,并可以使用DataFrame阅读器的损坏列功能将损坏的数据写入单独的位置,并使用另一个阅读器从同一作业中写入有效数据。(数据写成拼花)

但是在火花结构流中,我首先通过Kafka作为字符串读取流,然后使用from_json来获取我的数据帧。然后from_json使用JsonToSTRts,它在解析器中使用FailFast模式,并且不会将未解析的字符串返回到DataFrame中的列。(请参阅参考文献中的注释)然后我如何使用SSS?

最后,在批处理作业中,同一作业可以写入两个数据帧。但是Spark结构化流需要对多个接收器进行特殊处理。然后在Spark 2.3中。1(我的当前版本)我们应该包括关于如何正确写入损坏和无效流的详细信息。。。

裁判:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html

val rawKafkaDataFrame=spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.broker)
  .option("kafka.ssl.truststore.location", path.toString)
  .option("kafka.ssl.truststore.password", config.pass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.security.protocol", "SSL")
  .option("subscribe", config.topic)
  .option("startingOffsets", "earliest")

  .load()

val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))

// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")

共有2个答案

葛阳
2023-03-14

我只是想找出结构化流媒体的_corrupt_记录等价物。这是我想到的;希望它能让你更接近你想要的:

// add a status column to partition our output by
// optional: only keep the unparsed json if it was corrupt
// writes up to 2 subdirs: 'out.par/status=OK' and 'out.par/status=CORRUPT'
// additional status codes for validation of nested fields could be added in similar fashion

df.withColumn("struct", from_json($"value", schema))
  .withColumn("status", when($"struct".isNull, lit("CORRUPT")).otherwise(lit("OK")))
  .withColumn("value", when($"status" <=> lit("CORRUPT"), $"value"))
  .write
  .partitionBy("status")
  .parquet("out.par")
宇文元明
2023-03-14

当您从string转换为json时,如果它不能用提供的架构进行解析,它将返回null。您可以筛选空值并选择字符串。像这样的东西。

val jsonDF =  jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
 类似资料:
  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我正在使用Spark结构化流媒体;我的DataFrame具有以下架构 如何使用Parquet格式执行writeStream并写入数据(包含zoneId、deviceId、TimesInclast;除日期外的所有内容)并按日期对数据进行分区?我尝试了以下代码,但partition by子句不起作用

  • 我尝试在spark中使用结构化流媒体,因为它非常适合我的用例。然而,我似乎找不到将Kafka传入的数据映射到case类的方法。根据官方文件,我可以做到这一点。 mobEventDF有这样一个模式 有没有更好的方法?如何将其直接映射到下面的Scala Case类中?

  • 我使用spark 2.2.1,kafka_2.12-1.0.0和scala从kafka获取一些json数据,但是,我只连接了kafka,没有数据输出。 这里是我的scala代码: 这是我的绒球.xml 我运行这段代码,控制台没有显示任何来自kafka的数据。 这里是控制台输出: 输出只是说我的消费者群体已经死亡。我的kafka运行良好,我可以使用控制台命令从“行为”主题中获取数据。总之,Kafka

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。