当前位置: 首页 > 知识库问答 >
问题:

在同一Spark会话中运行多个Spark Kafka结构化流查询,增加偏移量但显示numInputRows%0

苗冯浩
2023-03-14
    val df1 = session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrapServer)
            .option("assign", "{\"multi-stream1\" : [0]}")
            .option("startingOffsets", latest)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("max.poll.records", 500)
            .option("failOnDataLoss", true)
            .load()
    val query1 = df1
            .select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .select("key","data.*")
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir1)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
            .queryName("query1").start()
    
    val df2 = session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrapServer)
            .option("assign", "{\"multi-stream1\" : [1]}")
            .option("startingOffsets", latest)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("max.poll.records", 500)
            .option("failOnDataLoss", true)
            .load()
val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .select("key","data.*")
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir2)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
            .queryName("query2").start()
    session.streams.awaitAnyTermination()

运行环境:本地PC-同样的问题。Dataproc集群-spark-submit--包

org.apache.spark:spark-sql-kafka-0-102.12:2.4.5--类org.differentPartitionSparkStreaming--主纱--部署模式集群--num-executors 2--驱动程序--内存4G--executor-cores 4--executor-memory 4G gs:/dpl-ingestion-event/jars/stream_consumer-jar-with-dependencies.jar“{”Multiple-Streaming“:[0]}”最新“10.w.x.y:9092,10.r.s.t:9092,10.a.b.c:9092”{“Multiple-Streaming”:[1]}“--同样的问题。

检查点和输出路径是谷歌桶。

原木

20/07/24 19:37:27 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "e7d026f7-bf62-4a86-8697-a95a2fc893bb",
  "runId" : "21169889-6e4b-419d-b338-2d4d61999f5b",
  "name" : "reconcile",
  "timestamp" : "2020-07-24T14:06:55.002Z",
  "batchId" : 2,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 3549,
    "getBatch" : 0,
    "getEndOffset" : 1,
    "queryPlanning" : 32,
    "setOffsetRange" : 1,
    "triggerExecution" : 32618,
    "walCommit" : 15821
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Assign[multi-stream1-1]]",
    "startOffset" : {
      "multi-stream1" : {
        "1" : 240
      }
    },
    "endOffset" : {
      "multi-stream1" : {
        "1" : 250
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[gs://dpl-ingestion-event/demo/test/single-partition/data]"
  }

共有1个答案

祝嘉懿
2023-03-14

我解决了这个问题。根本原因是两个查询都试图写入相同的基路径。因此,_spark_meta信息存在重叠。Spark Structured Streaming维护检查点,以及_spark_metadata文件,以跟踪正在处理的批处理。

源火花单据:

为了正确处理部分故障,同时保持仅一次语义,每个批处理的文件都被写到一个唯一的目录中,然后原子地追加到元数据日志中。当初始化一个基于拼花的数据源以便读取时,我们首先检查这个日志目录并使用它而不是文件列表。

 类似资料:
  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 所以我一个月前开始学习spark和cassandra。我遇到了这样一个问题,我必须使用spark预先聚合来自传感器的数据,然后将其放入cassandra表。 这是我的应用程序流程 问题是,我需要将数据按秒、分、时、日、月聚合到每年。这导致我在cassandra中创建了90多个聚合表。 就我的进展而言,我发现我必须使用每个聚合的一个写流查询将每个聚合下沉到每个cassandra表,这导致我创建了这个

  • 我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。 我正在使用 给Kafka写信。 当我利用 为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。 我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。 有没有一种Spark Structure流式处理方

  • 背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va

  • 我正在用Kafka运行一个结构化流应用程序。我发现如果系统因为某种原因停机几天...检查点变得过时,并且在Kafka中找不到与该检查点相对应的偏移量。我如何让Spark结构化流应用程序选择最后一个可用的偏移量,并从那里开始。我尝试将偏移量重置设置为“早期/最新”,但系统崩溃,出现以下错误: