当前位置: 首页 > 知识库问答 >
问题:

触发一次的火花流追加输出模式

祁鸿哲
2023-03-14
    null

触发器是否支持一次追加模式?

这里有一个最小的应用程序来再现这个问题。要旨

case class Model(valueForGroupBy: Int, time: Timestamp)

object Application {

  val appName = "sample"

//  val outputMode: OutputMode = OutputMode.Complete() // OK
  val outputMode: OutputMode = OutputMode.Append() // KO with trigger once

  val triggerMode: Trigger = Trigger.Once()

  val delayThreshold: FiniteDuration = 1.minute // watermarking wait for late

  val duration : FiniteDuration = 1.minute // window duration and slide

  val topic = "SAMPLE_TOPIC"
  val bootstrapServers = "127.0.0.1:9092"


  type KafkaKV = (String, String)

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder
      .appName(appName)
      .getOrCreate()

    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import spark.implicits._

    val streamReader: DataStreamReader = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .option("failOnDataLoss", "false")

    val df: DataFrame = streamReader.load()

    val ds: Dataset[Model] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[KafkaKV]
      .select(from_json($"value", Encoders.product[Model].schema).as("json"))
      .select($"json.*")
      .as[Model]

    val groupByColumns = Seq(
      window(new Column("time"), windowDuration = duration.toString, slideDuration = duration.toString),
      new Column("valueForGroupBy")
    )

    val agg = ds
      .withWatermark("time", delayThreshold.toString)
      .groupBy(groupByColumns: _*)
      .count()

    val streamWriter = agg
      .selectExpr(s"CAST(valueForGroupBy AS STRING) AS key", "to_json(struct(*)) AS value")
      .writeStream
      .trigger(triggerMode)
      .outputMode(outputMode)
      .format("console")
      .option("truncate", value = false)

    val streamingQuery = streamWriter.start()

    streamingQuery.awaitTermination()

  }
}

共有1个答案

长孙鸿振
2023-03-14

这是Spark已知的BUG。

在实际实现中,水印是在下面的批处理中持久化的。由于触发一次时没有后续批处理,因此watermak永远不会持久化。

Spark bugtracker中存在票据:https://issues.apache.org/jira/browse/Spark-24699

 类似资料:
  • 我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF

  • 更新:为了迭代支持,我不得不转向Flink流。不过还是会和Kafka试试看!

  • 我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。 父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。 我认为将所有输入流设为 KTable,它们

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 我正在尝试使用python库Tweepy来传输twitter数据。我设置了工作环境,谷歌了一下这些东西,但是我不知道它们是如何工作的。我想在python (tweepy)中使用spark streaming(DStream-Batch processing)。我至少经历了以下环节: < li >如何获取tweepy中某个位置的特定标签的推文? < Li > http://spark . Apach

  • 我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre