val df1 = session.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServer)
.option("assign", "{\"multi-stream1\" : [0]}")
.option("startingOffsets", latest)
.option("key.deserializer", classOf[StringDeserializer].getName)
.option("value.deserializer", classOf[StringDeserializer].getName)
.option("max.poll.records", 500)
.option("failOnDataLoss", true)
.load()
val query1 = df1
.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
.select("key","data.*")
.writeStream.format("parquet").option("path", path).outputMode("append")
.option("checkpointLocation", checkpoint_dir1)
.partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
.queryName("query1").start()
val df2 = session.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServer)
.option("assign", "{\"multi-stream1\" : [1]}")
.option("startingOffsets", latest)
.option("key.deserializer", classOf[StringDeserializer].getName)
.option("value.deserializer", classOf[StringDeserializer].getName)
.option("max.poll.records", 500)
.option("failOnDataLoss", true)
.load()
val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
.select("key","data.*")
.writeStream.format("parquet").option("path", path).outputMode("append")
.option("checkpointLocation", checkpoint_dir2)
.partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
.queryName("query2").start()
session.streams.awaitAnyTermination()
运行环境:本地PC-同样的问题。Dataproc集群-spark-submit--包
org.apache.spark:spark-sql-kafka-0-102.12:2.4.5--类org.differentPartitionSparkStreaming--主纱--部署模式集群--num-executors 2--驱动程序--内存4G--executor-cores 4--executor-memory 4G gs:/dpl-ingestion-event/jars/stream_consumer-jar-with-dependencies.jar“{”Multiple-Streaming“:[0]}”最新“10.w.x.y:9092,10.r.s.t:9092,10.a.b.c:9092”{“Multiple-Streaming”:[1]}“--同样的问题。
检查点和输出路径是谷歌桶。
原木
20/07/24 19:37:27 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "e7d026f7-bf62-4a86-8697-a95a2fc893bb",
"runId" : "21169889-6e4b-419d-b338-2d4d61999f5b",
"name" : "reconcile",
"timestamp" : "2020-07-24T14:06:55.002Z",
"batchId" : 2,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 3549,
"getBatch" : 0,
"getEndOffset" : 1,
"queryPlanning" : 32,
"setOffsetRange" : 1,
"triggerExecution" : 32618,
"walCommit" : 15821
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Assign[multi-stream1-1]]",
"startOffset" : {
"multi-stream1" : {
"1" : 240
}
},
"endOffset" : {
"multi-stream1" : {
"1" : 250
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[gs://dpl-ingestion-event/demo/test/single-partition/data]"
}
我解决了这个问题。根本原因是两个查询都试图写入相同的基路径。因此,_spark_meta信息存在重叠。Spark Structured Streaming维护检查点,以及_spark_metadata文件,以跟踪正在处理的批处理。
源火花单据:
为了正确处理部分故障,同时保持仅一次语义,每个批处理的文件都被写到一个唯一的目录中,然后原子地追加到元数据日志中。当初始化一个基于拼花的数据源以便读取时,我们首先检查这个日志目录并使用它而不是文件列表。
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
所以我一个月前开始学习spark和cassandra。我遇到了这样一个问题,我必须使用spark预先聚合来自传感器的数据,然后将其放入cassandra表。 这是我的应用程序流程 问题是,我需要将数据按秒、分、时、日、月聚合到每年。这导致我在cassandra中创建了90多个聚合表。 就我的进展而言,我发现我必须使用每个聚合的一个写流查询将每个聚合下沉到每个cassandra表,这导致我创建了这个
我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。 我正在使用 给Kafka写信。 当我利用 为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。 我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。 有没有一种Spark Structure流式处理方
背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va
我正在用Kafka运行一个结构化流应用程序。我发现如果系统因为某种原因停机几天...检查点变得过时,并且在Kafka中找不到与该检查点相对应的偏移量。我如何让Spark结构化流应用程序选择最后一个可用的偏移量,并从那里开始。我尝试将偏移量重置设置为“早期/最新”,但系统崩溃,出现以下错误: