触发器是否支持一次追加模式?
这里有一个最小的应用程序来再现这个问题。要旨
case class Model(valueForGroupBy: Int, time: Timestamp)
object Application {
val appName = "sample"
// val outputMode: OutputMode = OutputMode.Complete() // OK
val outputMode: OutputMode = OutputMode.Append() // KO with trigger once
val triggerMode: Trigger = Trigger.Once()
val delayThreshold: FiniteDuration = 1.minute // watermarking wait for late
val duration : FiniteDuration = 1.minute // window duration and slide
val topic = "SAMPLE_TOPIC"
val bootstrapServers = "127.0.0.1:9092"
type KafkaKV = (String, String)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.appName(appName)
.getOrCreate()
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._
val streamReader: DataStreamReader = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val df: DataFrame = streamReader.load()
val ds: Dataset[Model] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[KafkaKV]
.select(from_json($"value", Encoders.product[Model].schema).as("json"))
.select($"json.*")
.as[Model]
val groupByColumns = Seq(
window(new Column("time"), windowDuration = duration.toString, slideDuration = duration.toString),
new Column("valueForGroupBy")
)
val agg = ds
.withWatermark("time", delayThreshold.toString)
.groupBy(groupByColumns: _*)
.count()
val streamWriter = agg
.selectExpr(s"CAST(valueForGroupBy AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.trigger(triggerMode)
.outputMode(outputMode)
.format("console")
.option("truncate", value = false)
val streamingQuery = streamWriter.start()
streamingQuery.awaitTermination()
}
}
这是Spark已知的BUG。
在实际实现中,水印是在下面的批处理中持久化的。由于触发一次时没有后续批处理,因此watermak永远不会持久化。
Spark bugtracker中存在票据:https://issues.apache.org/jira/browse/Spark-24699
我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF
更新:为了迭代支持,我不得不转向Flink流。不过还是会和Kafka试试看!
我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。 父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。 我认为将所有输入流设为 KTable,它们
我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该
我正在尝试使用python库Tweepy来传输twitter数据。我设置了工作环境,谷歌了一下这些东西,但是我不知道它们是如何工作的。我想在python (tweepy)中使用spark streaming(DStream-Batch processing)。我至少经历了以下环节: < li >如何获取tweepy中某个位置的特定标签的推文? < Li > http://spark . Apach
我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre