我对year执行了一个简单的group by操作,并做了一些聚合,如下所示。我尝试将结果附加到hdfs路径,如下所示。我说错了,
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark;;
Aggregate [year#88], [year#88, sum(rating#89) AS rating#173,
sum(cast(duration#90 as bigint)) AS duration#175L]
+- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
val spark =SparkSession.builder().appName("mddd").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))
df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()
id,name,year,rating,duration
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
year,rating,duration
1993,7.4,8956
1921,6.0,15212
1963,10.7,17389
我真的不知道我的方法有什么问题。请帮忙
这是一个有很多方面的问题:
>
手册声明:必须在聚合中使用的时间戳列的同一列上调用withWatermark。
例如,DF.WithWatermark(“Time”,“1 min”).GroupBy(“Time2”).Count()在追加输出模式中无效,因为水印是在与聚合列不同的列上定义的。简单地说,追加需要水印。我觉得你有问题。
.enableHiveSupport().config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
因此,一般来说:
然后:
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
//create stream from socket
import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS()
val windowedCount = stockDs
.withWatermark("time", "20000 milliseconds")
.groupBy(
window($"time", "10 seconds"),
$"symbol"
)
.agg(sum("value"), count($"symbol"))
val query =
windowedCount.writeStream
.format("console")
.option("truncate", "false")
.outputMode(OutputMode.Append())
query.start().awaitTermination()
Batch: 14
----------------------------------------------+------+----------+-------------+
|window |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0 |6 |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0 |2 |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0 |1 |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0 |4 |
+---------------------------------------------+------+----------+-------------+
这是一个相当大的话题,你需要整体来看。
您可以看到,在某些情况下,对输出进行计数可能会很方便,尽管avg输出可以用于对总的avg进行计数。成功。
所以我试着从一个字符串中使用getBytes,我知道如果它遇到一个不能变成真实数据的字符,它会抛出一个UnsupportedEncodingException。我添加了java.io来提供异常,但是当我把它放到try catch语句中时,我得到了“UnsupportedEncodingException的不可到达的catch块。此异常从不从try语句体中引发" 这是我的确切结构。myCharact
我正在尝试从进行HTTP调用。该调用在post man中工作正常,下面是调用的版本, 但是我试图使用进行相同的调用。 但是,应用程序正在抛出异常, JAVAlang.IllegalStateException:block()/blockFirst()/blockLast()正在阻塞,这在thread reactor-http-nio-3中不受支持 这里出了什么问题?我对反应式编程非常陌生,谷歌对这
我不知道;我不太明白在哪里可以抛出这个异常。 例如,我正在实现
相反,将引发“UnsupportedOperationException”。看起来ContainerRequest没有从修改的请求中提取UserPrincipal。 修改是通过 问题是如何将主体信息从HttpServerProbe传输到ContainerRequestFilter。request具有安全信息(在本例中是SSL客户机证书信息),而com.sun.jersey.spi.containe
是否有可能用Java构造一段代码,从而生成一个假设的不可修补? 想到的想法是使用例如拦截器或面向方面的编程。
在使用boto3 for python实现aws textract时。 代码: 下面是aws的凭证和配置文件 我得到了一个例外: 我对AWS textract有点陌生,任何帮助都将不胜感激。