我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。
我正在使用
val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()
给Kafka写信。
当我利用
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。
我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。
有没有一种Spark Structure流式处理方法可以捕获写入Kafka时生成的偏移量信息?
添加示例:当我在创建主题后从源1运行三行数据时,我得到:
运行1:开始偏移量:null,结束偏移量:{“logOffset”:0}开始偏移量:{“logOffset”:0},结束偏移量:{“logOffset”:0}
Kafka Says:
ruwe:2:1
ruwe:1:1
ruwe:0:1
运行2;
Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
Start Offset: {"logOffset":1}, End offset: {"logOffset":1}
Kafka Says:
ruwe:2:2
ruwe:1:2
ruwe:0:2
运行3:
Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
Start Offset: {"logOffset":2}, End offset: {"logOffset":2}
Kafka Says:
ruwe:2:3
ruwe:1:3
ruwe:0:3
然后我用来自不同来源的相同程序运行数据并接收
Start Offset: null, End offset: {"logOffset":0}
Start Offset: {"logOffset":0}, End offset: {"logOffset":0}
and of course Kafka continued to increment
这表示Spark正在报告基于源的信息
我想知道目标中创建了什么。
在阅读Spark结构流的代码后,特别是Kafka KafkaWriter、KafkaWriteTask和CachedKafkaProducer,Spark不会消耗回调中从KafkaProducer返回的偏移量。他们定义的回调只捕获异常。基于此,我想说在当前版本2.2中它无法完成。
他们提供的信息都是围绕查询源而不是目标提供的。
有没有一种Spark Structure流式处理方法可以捕获写入Kafka时生成的偏移量信息?
是的,在onQuerygres
中,您需要查看StreamingQueryProgress.sources
,它是一个Array[Sourcegres]
。它有两个字符串,start Offset
和endOffset
,它们是您可以解析的JSON:
sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ???
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val source = event.progress.sources.headOption
source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}"))
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = ()
})
JSON具有以下结构:
"startOffset" : {
"topic-name" : {
"0" : 1,
"1" : 22,
"2" : 419,
}
},
"endOffset" : {
"topic-name" : {
"0" : 10,
"1" : 100,
"2" : 1000
}
}
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
使用lib:运行spark结构化流时,我们不断收到有关当前偏移量获取的错误: 引起原因:org.apache.spark.Spark异常:由于阶段失败而中止作业:阶段0.0中的任务0失败4次,最近的失败:阶段0.0中丢失任务0.3(TID 3,qa2-hdp-4.acuityads.org,执行器2):java.lang.断言错误:断言失败:最新的off et-922337203685477580
我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\
我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。