当前位置: 首页 > 知识库问答 >
问题:

如何从Kafka读取JSON数据并使用Spark结构化流存储到HDFS?

江烨伟
2023-03-14

我正在尝试从Kafka读取JSON消息并将它们存储在具有火花结构化流的HDFS中。

我遵循了下面的示例,当我的代码如下所示时:

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.writeStream.format("json").option("checkpointLocation", "some/hdfs/path").start(/data")

然后我得到hdfs中具有二进制值的行。

{"value":"BINARY DATA","topic":"test_hdfs2","partition":0,"offset":3463075,"timestamp":"2018-07-24T20:51:33.655Z","timestampType":0}

这些行按预期连续写入,但采用二进制格式。

我发现了这个帖子:

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

我正在尝试实现这个示例:

schema = StructType().add("a", IntegerType()).add("b", StringType())
df.select( \
  col("key").cast("string"),
  from_json(col("value").cast("string"), schema))

但是在这里我得到了一个奇怪的behvaiur。我有一个写入hdfs的小文件,其中包含多个空json行-{}

作业很快就会失败,出现以下异常:

18/07/24 22:25:47 ERROR datasources.FileFormatWriter: Aborting job
null. java.lang.IllegalStateException:
hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when
compacting batch 409 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
18/07/24 22:25:47 ERROR streaming.MicroBatchExecution: Query [id =
4f6c4ebc-f330-4697-b2db-7989b93dfba3, runId =
57575397-9fda-4370-9dcb-4550ae1576ec] terminated with error
org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.IllegalStateException:
hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when
compacting batch 409 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213)
        ... 17 more

你知道如何以正确的方式实现吗?

共有1个答案

吕征
2023-03-14

如果您看到错误(399.compact在压缩批次409(compactInterval:10)时不存在),那么它清楚地表明错误是由于找不到压缩文件夹。基本上,在spark结构化流媒体中,每次批处理运行都会创建一个。压缩\u spark\u metadata下的文件夹,并为了防止过去运行时产生大量压缩文件的开销,它会定期尝试合并这些文件。

我认为默认是每10个批次尝试压缩一次。所以在这里,当它运行批次409并尝试压缩它时,它没有找到前一个并失败。一种选择是设置。这不是真正的业务错误,只会引发簿记错误,并防止您的应用程序终止在下面添加失败OnDataLoss为false。

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.servers)
      .option("subscribe", conf.topics)

      .option("failOnDataLoss",false)

使用以下特性增加压实间隔

spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay", 60000)
 类似资料:
  • 我试图使用结构化流方法,使用基于DataFrame/DataSet API的Spark-Streaming来加载来自Kafka的数据流。 我使用: 火花2.10 Kafka0.10 SPARK-SQL-KAFKA-0-10

  • 我使用spark 2.2.1,kafka_2.12-1.0.0和scala从kafka获取一些json数据,但是,我只连接了kafka,没有数据输出。 这里是我的scala代码: 这是我的绒球.xml 我运行这段代码,控制台没有显示任何来自kafka的数据。 这里是控制台输出: 输出只是说我的消费者群体已经死亡。我的kafka运行良好,我可以使用控制台命令从“行为”主题中获取数据。总之,Kafka

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 在火花批处理作业中,我通常将JSON数据源写入文件,并可以使用DataFrame阅读器的损坏列功能将损坏的数据写入单独的位置,并使用另一个阅读器从同一作业中写入有效数据。(数据写成拼花) 但是在火花结构流中,我首先通过Kafka作为字符串读取流,然后使用from_json来获取我的数据帧。然后from_json使用JsonToSTRts,它在解析器中使用FailFast模式,并且不会将未解析的字符

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我的火花流应用程序从Kafka获取数据并对其进行处理。 如果应用程序失败,大量数据存储在Kafka中,并且在Spark Streaming应用程序的下一次启动时,它会崩溃,因为一次消耗了太多数据。由于我的应用程序不关心过去的数据,因此只消耗当前(最新)数据完全没关系。 我找到了“auto.reset.offest”选项,它在Spark中的行为几乎没有什么不同。如果配置了zookeeper,它会删除