当前位置: 首页 > 知识库问答 >
问题:

spark structured streaming异常:没有水印不支持追加输出模式

劳嘉实
2023-03-14

我对year执行了一个简单的group by操作,并做了一些聚合,如下所示。我尝试将结果附加到hdfs路径,如下所示。我说错了,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
    val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))

    df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
    option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()
    id,name,year,rating,duration
    1,The Nightmare Before Christmas,1993,3.9,4568
    2,The Mummy,1993,3.5,4388
    3,Orphans of the Storm,1921,3.2,9062
    4,The Object of Beauty,1921,2.8,6150
    5,Night Tide,1963,2.8,5126
    6,One Magic Christmas,1963,3.8,5333
    7,Muriel's Wedding,1963,3.5,6323
    8,Mother's Boys,1963,3.4,5733
    year,rating,duration
    1993,7.4,8956
    1921,6.0,15212
    1963,10.7,17389

我真的不知道我的方法有什么问题。请帮忙

共有1个答案

裴韬
2023-03-14

这是一个有很多方面的问题:

>

  • 结构化流API有IMHO的限制。
  • 一个人可以管道多个查询,技术上它可以运行,但不产生输出,所以这样做没有价值--而且即使您可以指定它,它也不能执行这样的其他功能。
  • 手册声明:必须在聚合中使用的时间戳列的同一列上调用withWatermark。

    例如,DF.WithWatermark(“Time”,“1 min”).GroupBy(“Time2”).Count()在追加输出模式中无效,因为水印是在与聚合列不同的列上定义的。简单地说,追加需要水印。我觉得你有问题。

      .enableHiveSupport().config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
    
      null

    因此,一般来说:

    • 在“完成”、“追加”和“更新”选项中,我认为您选择了正确的“追加”选项。可以使用更新,但我将其排除在范围之外。
    • 但未将event_time放入窗口中。你应该这么做。我在这里的末尾放了一个例子,在Spark Shell中运行时,我无法使case类工作--这就是为什么要花这么长时间的原因,但在一个编译的程序中,它不是一个问题,或者数据库。
    • 在功能上,您不能编写多个查询来执行您尝试的聚合。在我的例子中,它只会产生一些错误。
    • 我建议您使用我使用的时间戳方法,它更容易,因为我无法测试您的所有内容。

    然后:

      null
    import java.sql.Timestamp
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming.OutputMode
    
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()
    //create stream from socket
    
    import sparkSession.implicits._
    sparkSession.sparkContext.setLogLevel("ERROR")
    val socketStreamDs = sparkSession.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
      .as[String]
    
    val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS() 
    
    val windowedCount = stockDs
      .withWatermark("time", "20000 milliseconds")
      .groupBy( 
        window($"time", "10 seconds"),
               $"symbol" 
      )
      .agg(sum("value"), count($"symbol"))
    
    val query =
      windowedCount.writeStream
        .format("console")
        .option("truncate", "false")
        .outputMode(OutputMode.Append())
    
    query.start().awaitTermination()
    
    Batch: 14
    ----------------------------------------------+------+----------+-------------+  
    |window                                       |symbol|sum(value)|count(symbol)|
    +---------------------------------------------+------+----------+-------------+
    |[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0    |6            |
    |[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0     |2            |
    |[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0    |1            |
    |[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0    |4            |
    +---------------------------------------------+------+----------+-------------+
    

    这是一个相当大的话题,你需要整体来看。

    您可以看到,在某些情况下,对输出进行计数可能会很方便,尽管avg输出可以用于对总的avg进行计数。成功。

  •  类似资料:
    • 所以我试着从一个字符串中使用getBytes,我知道如果它遇到一个不能变成真实数据的字符,它会抛出一个UnsupportedEncodingException。我添加了java.io来提供异常,但是当我把它放到try catch语句中时,我得到了“UnsupportedEncodingException的不可到达的catch块。此异常从不从try语句体中引发" 这是我的确切结构。myCharact

    • 我正在尝试从进行HTTP调用。该调用在post man中工作正常,下面是调用的版本, 但是我试图使用进行相同的调用。 但是,应用程序正在抛出异常, JAVAlang.IllegalStateException:block()/blockFirst()/blockLast()正在阻塞,这在thread reactor-http-nio-3中不受支持 这里出了什么问题?我对反应式编程非常陌生,谷歌对这

    • 我不知道;我不太明白在哪里可以抛出这个异常。 例如,我正在实现

    • 相反,将引发“UnsupportedOperationException”。看起来ContainerRequest没有从修改的请求中提取UserPrincipal。 修改是通过 问题是如何将主体信息从HttpServerProbe传输到ContainerRequestFilter。request具有安全信息(在本例中是SSL客户机证书信息),而com.sun.jersey.spi.containe

    • 是否有可能用Java构造一段代码,从而生成一个假设的不可修补? 想到的想法是使用例如拦截器或面向方面的编程。

    • 在使用boto3 for python实现aws textract时。 代码: 下面是aws的凭证和配置文件 我得到了一个例外: 我对AWS textract有点陌生,任何帮助都将不胜感激。