当前位置: 首页 > 知识库问答 >
问题:

如何制作Spark流媒体(Spark 1.0.0)从Kafka(Kafka Broker 0.8.1)读取最新数据

廖华翰
2023-03-14

我的火花流应用程序从Kafka获取数据并对其进行处理。

如果应用程序失败,大量数据存储在Kafka中,并且在Spark Streaming应用程序的下一次启动时,它会崩溃,因为一次消耗了太多数据。由于我的应用程序不关心过去的数据,因此只消耗当前(最新)数据完全没关系。

我找到了“auto.reset.offest”选项,它在Spark中的行为几乎没有什么不同。如果配置了zookeeper,它会删除存储在zookeeper中的偏移量。然而,尽管它的行为出人意料,但它应该从删除后的最新版本中获取数据。

但我发现不是。我看到所有的偏移量在消费数据之前都被清理了。然后,因为默认行为,它应该按照预期获取数据。但它仍然因为数据太多而崩溃。

当我使用“Kafka Console Consumer”清理偏移量并使用最新版本的数据,并运行我的应用程序时,它会按预期工作。

所以它看起来“auto.reset.offset”不起作用,spark streaming中的Kafka消费者从“最小”偏移量获取数据作为默认值。

你知道如何使用spark流媒体中最新的Kafka数据吗?

我使用的是Spark 1.0.0和Kafka-2.10-0.8.1。

提前感谢。

共有1个答案

汝繁
2023-03-14

我想你拼错了财产名称。正确的键是auto.offset.reset而不是auto.reset.offest

更多信息请点击此处:http://kafka.apache.org/documentation.html#configuration

希望这有帮助。

 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在读这篇博文: http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html 它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Stream

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可

  • 1流媒体和Kafka broker版本0.8.2.1,我在AWS上为spark和Kafka提供了单独的服务器。 使用直接进近,我希望从流媒体中获得30个字符串,但实际接收范围只有15-25个。交叉检查Kafka消费者在300秒内显示30个字符串。还有小溪。foreachRDD{rdd= 获取最终数据背后有什么问题。我正在使用火花会话创建sc和ssc。 谢谢你。