当前位置: 首页 > 知识库问答 >
问题:

结构化流媒体从kafka获取了错误的当前偏移量

孔欣荣
2023-03-14

使用lib:“org.apache.spark”%%“spark-sql-kafka-0-10”%%“2.4.0”运行spark结构化流时,我们不断收到有关当前偏移量获取的错误:

引起原因:org.apache.spark.Spark异常:由于阶段失败而中止作业:阶段0.0中的任务0失败4次,最近的失败:阶段0.0中丢失任务0.3(TID 3,qa2-hdp-4.acuityads.org,执行器2):java.lang.断言错误:断言失败:最新的off et-9223372036854775808在scala不等于-1。断言(Predef.scala:170)org.apache.spark.sql.kafka010。org.apache.spark.sql.kafka010的eader.resolve范围(KafkaMicroBatchReader.scala:371)。KafkaMicroBatchInput分区阅读器。)KafkaMicroBatchReader.scala:329)在org.apache.spark.sql.kafka010。KafkaMicroBatchInputPartition.create分区读取器(KafkaMicroBatchReader.scala:314)org.apache.spark.sql.execution.datasources.v2。在org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala: 324)在org.apache.spark.rdd.RDD. iterator(RDD. scala: 288)在org.apache.spark.rdd.MapPartionsRDD. comute(MapPartionsRDD. scala: 52)在org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala: 324)在org.apache.spark.rdd.RDD. iterator(RDD. scala: 288)在org.apache.spark.rdd.MapPartionsRDD. comute(MapPartionsRDD. scala: 52)在org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala: 324)在org. apache. spak. rdd.rdd. iterator(rdd. scala: 288)at org. apache. spark. rdd。scala: 52)在org. apache. spark. rdd。RDD. computeOrReadCheckpoint(RDD. scala: 324)at org. apache. spark. rdd。rdd. iterator(rdd. scala: 288)at org. apache. spark.调度器。ShuffleMapTasks. runWork(ShuffleMapTasks. scala: 99)在org. apache. spak.调度器。ShuffleMapTasks. runWork(ShuffleMapTasks. scala: 55)在org. apache. spak.调度器。在org. apache. spak. exitor上运行(Task. scala: 121)。执行者$TaskRunner$anonfun10美元。应用(Executor. scala: 402)在org. apache. spak. util。Utils$. try BackSafe最终(Utils. scala: 1360)在org. apache. spak. exitor。执行器$TaskRunner. run(Executor. scala: 408)在java. util. con当前。ThreadPoolExecutor. runWorker(ThreadPoolExecutor. java: 1142)在java. util. con当前。在java. lang上运行(ThreadPoolExecator. java: 617)。线程.运行(线程. java: 745)

出于某种原因,看来getchLatestOffset返回了Long。MIN_VALUE其中一个分区。我检查了结构化流检查点,这是正确的,它是当前的可用偏移设置为Long。MIN_VALUE。

Kafka代理版本:1.1.0。我们使用的lib:

{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" }}

如何复制:基本上,我们开始了一个结构化的流媒体,订阅了一个由4个分区组成的主题。然后在主题中产生了一些消息,作业崩溃并像上面一样记录了堆栈跟踪。

此外,提交的偏移量看起来很好,正如我们在日志中看到的:

=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":-9223372036854775808}}}

所以火花流记录了分区的正确值: 0,但是从kafka返回的当前可用偏移显示为Long。MIN_VALUE。

共有1个答案

松高爽
2023-03-14

发现问题,这是由于spark结构化流媒体库中存在整数溢出。详情如下:https://issues.apache.org/jira/browse/SPARK-26718

 类似资料:
  • 我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。 我正在使用 给Kafka写信。 当我利用 为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。 我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。 有没有一种Spark Structure流式处理方

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我