我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", fromKafkaServers)
.option("subscribe", topicName)
.option("startingOffset", "earliest")
.load()
我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法为每个spark文档设置spark structured streaming group.id,似乎所有订阅相同主题的应用程序都分配给一个使用者组。如果我希望有两个独立的流作业运行,并订阅同一主题,该怎么办?
您有一个错误:)它是startingOffsets
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
使用lib:运行spark结构化流时,我们不断收到有关当前偏移量获取的错误: 引起原因:org.apache.spark.Spark异常:由于阶段失败而中止作业:阶段0.0中的任务0失败4次,最近的失败:阶段0.0中丢失任务0.3(TID 3,qa2-hdp-4.acuityads.org,执行器2):java.lang.断言错误:断言失败:最新的off et-922337203685477580
我正在运行以下scala代码: 我知道firstStruct是structType,StructFields的一个名称是“name”,但在尝试强制转换时似乎失败了。我被告知spark/hive结构与scala不同,但为了使用structType,我需要 所以我想他们应该是同一种类型的。 我看了看这里:https://github.com/apache/spark/blob/master/sql/c
我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。 我正在使用 给Kafka写信。 当我利用 为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。 我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。 有没有一种Spark Structure流式处理方
我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?
我想出一个例外: 在这个程序中,我尝试从hdfs路径读取记录,并将它们保存到Kafka中。问题是当我移除关于向Kafka发送记录的代码时,它运行得很好。我错过了什么?