当前位置: 首页 > 知识库问答 >
问题:

使用Spark结构化流媒体编写时捕获Kafka偏移量

邵逸明
2023-03-14

我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。

我正在使用

val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()

给Kafka写信。

当我利用

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
    println("Query started: " + queryStarted.id) 
  }
  override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
    println("Query terminated: " + queryTerminated.id)
  }
  override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
    println("Query made progress: " + queryProgress.progress)
  }
})

为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。

我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。

有没有一种Spark Structure流式处理方法可以捕获写入Kafka时生成的偏移量信息?

添加示例:当我在创建主题后从源1运行三行数据时,我得到:
运行1:开始偏移量:null,结束偏移量:{“logOffset”:0}开始偏移量:{“logOffset”:0},结束偏移量:{“logOffset”:0}

 Kafka Says:
 ruwe:2:1
 ruwe:1:1
 ruwe:0:1

运行2;

  Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
  Start Offset: {"logOffset":1}, End offset: {"logOffset":1}

 Kafka Says:
 ruwe:2:2
 ruwe:1:2
 ruwe:0:2

运行3:

  Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
  Start Offset: {"logOffset":2}, End offset: {"logOffset":2}

 Kafka Says:
 ruwe:2:3
 ruwe:1:3
 ruwe:0:3

然后我用来自不同来源的相同程序运行数据并接收

  Start Offset: null, End offset: {"logOffset":0}
  Start Offset: {"logOffset":0}, End offset: {"logOffset":0}

  and of course Kafka continued to increment

这表示Spark正在报告基于源的信息

我想知道目标中创建了什么。

共有2个答案

公孙向荣
2023-03-14

在阅读Spark结构流的代码后,特别是Kafka KafkaWriter、KafkaWriteTask和CachedKafkaProducer,Spark不会消耗回调中从KafkaProducer返回的偏移量。他们定义的回调只捕获异常。基于此,我想说在当前版本2.2中它无法完成。

他们提供的信息都是围绕查询源而不是目标提供的。

景子安
2023-03-14

有没有一种Spark Structure流式处理方法可以捕获写入Kafka时生成的偏移量信息?

是的,在onQuerygres中,您需要查看StreamingQueryProgress.sources,它是一个Array[Sourcegres]。它有两个字符串,start OffsetendOffset,它们是您可以解析的JSON:

sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ???

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val source = event.progress.sources.headOption
    source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}"))
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = ()
})

JSON具有以下结构:

"startOffset" : {
  "topic-name" : {
    "0" : 1,
    "1" : 22,
    "2" : 419,
  }
},
"endOffset" : {
  "topic-name" : {
    "0" : 10,
    "1" : 100,
    "2" : 1000
  }
}
 类似资料:
  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 使用lib:运行spark结构化流时,我们不断收到有关当前偏移量获取的错误: 引起原因:org.apache.spark.Spark异常:由于阶段失败而中止作业:阶段0.0中的任务0失败4次,最近的失败:阶段0.0中丢失任务0.3(TID 3,qa2-hdp-4.acuityads.org,执行器2):java.lang.断言错误:断言失败:最新的off et-922337203685477580

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。