当前位置: 首页 > 知识库问答 >
问题:

火花流从Kafka源返回到检查点或重绕

翟丰茂
2023-03-14

当streaming Spark DStreams作为来自Kafka源的消费者时,可以检查Spark上下文,因此当应用程序崩溃(或受到kill-9的影响)时,应用程序可以从上下文检查点恢复。但如果应用程序“意外地部署了错误的逻辑”,您可能想要倒回到最后一个主题+分区+偏移量,以重播某个Kafka主题的分区偏移量位置的事件,这些位置在“错误逻辑”之前正常工作。当检查点生效时,流式应用程序如何被重绕到最后的“好点”(主题+分区+偏移量)?

注意:在I(心脏)日志中,Jay Kreps写到使用并行的消费者(组)进程,它从发散的Kafka偏移量位置开始,直到赶上原始的,然后杀死原始的。(关于从某些分区/偏移量位置开始的第二个火花流进程是什么样子的?)

侧栏:这个问题可能与中流更改配置有关,因为可能需要部署一个类似的机制。

共有1个答案

羊舌兴文
2023-03-14

您将无法倒回正在运行的sparkstreamingcontext中的流。记住以下几点很重要(直接从文档中):

  • 一旦上下文启动,就不能设置或添加新的流计算。
  • 上下文一旦停止,就无法重新启动。
  • 一个JVM中同时只能有一个StreamingContext处于活动状态。
  • StreamingContext上的
  • stop()也会停止SparkContext。若要仅停止StreamingContext,请将stop()的可选参数stopSparkContext设置为false。
  • 只要在创建下一个StreamingContext之前停止前一个StreamingContext

相反,您将不得不停止当前流,并创建一个新的流。您可以使用createdirectstream的某个版本从特定的偏移量集启动流,该版本采用fromoffsets参数,其签名为map[TopicAndPartition,Long]--它是主题和分区映射的起始偏移量。

另一种理论上的可能性是使用kafkautils.createrdd,它将偏移量范围作为输入。假设您的“坏逻辑”是从偏移量X开始的,然后在偏移量Y处修复的。对于某些用例,您可能只想用从X到Y的偏移量来执行createrdd并处理这些结果,而不是尝试将其作为流来执行。

 类似资料:
  • 我们正在构建一个使用Spark Streaming和Kafka的容错系统,并且正在测试Spark Streaming的检查点,以便在Spark作业因任何原因崩溃时可以重新启动它。下面是我们的spark过程的样子: Spark Streaming每5秒运行一次(幻灯片间隔),从Kafka读取数据 Kafka每秒大约接收80条消息 我们想要实现的是一个设置,在这个设置中,我们可以关闭spark流作业(

  • 我在生产中遇到检查点问题,当 spark 无法从_spark_metadata文件夹中找到文件时 已经提出了一个问题,但目前还没有解决方案。 在检查点文件夹中,我看到批次29尚未提交,所以我可以从检查点的、和/或中删除一些内容,以防止火花因缺少文件而失败?

  • 为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-