当前位置: 首页 > 知识库问答 >
问题:

Spark Structured Streaming with Kafka不尊重startingoffset=“最早”

娄浩荡
2023-03-14

我已经设置了Spark Structured Streaming(Spark2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark streaming作业启动之前就进入了主题,我无法从主题的开始消费。这是Spark streaming的预期行为吗?它忽略了Spark streaming作业初始运行之前产生的Kafka消息(即使带有.选项(“StratingOffSets”,“Earlish”))?

>

  • 在启动流作业之前,创建test主题(单个代理,单个分区)并生成指向该主题的消息(在我的示例中为3条html" target="_blank">消息)。

    执行下面的spark scala代码。

    // Local
    val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9097")
      .option("failOnDataLoss","false")
      .option("stratingOffsets","earliest")
      .option("subscribe", "test")
      .load()
    
    // Sink Console
    val ds = df.writeStream.format("console").queryName("Write to console")
      .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
      .start()
    

    我希望流从offset=1开始。但是,它从offset=3开始读取。您可以看到,kafka客户端实际上正在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583-[Consumer ClientID=Consumer-2,GroupID=spark-kafka-source-e948eee9-3024-4F14-bcb8-75b80d43cbb1-181544888-driver-0]将分区测试-0的偏移量重置为偏移量3。

    我可以看到spark流处理我在启动流作业后生成的消息。

    2019-06-18 21:22:57 INFO  AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
    2019-06-18 21:22:57 INFO  AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
    2019-06-18 21:22:57 INFO  MicroBatchExecution:54 - Starting new streaming query.
    2019-06-18 21:22:57 INFO  Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
    2019-06-18 21:22:57 INFO  AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
    2019-06-18 21:22:57 INFO  ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
    2019-06-18 21:22:57 INFO  AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
    2019-06-18 21:22:57 INFO  AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
    2019-06-18 21:22:57 INFO  ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
    2019-06-18 21:22:57 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    2019-06-18 21:22:58 INFO  KafkaSource:54 - Initial offsets: {"test":{"0":3}}
    2019-06-18 21:22:58 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    2019-06-18 21:22:58 INFO  MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
    2019-06-18 21:22:58 INFO  KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
    
    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9097")
      .option("subscribe", "test")
      .load()
    
    df.count // Long = 3
    
  • 共有1个答案

    匡翰
    2023-03-14

    哈哈,这是一个简单的错别字:“stratingoffsets”应该是“startingoffsets”

     类似资料:
    • 我已经设置了Spark结构流(Spark 2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark流作业开始之前进入主题,我无法从主题的开始消费。Spark streaming会忽略在初始运行Spark Stream作业之前产生的Kafka消息(即使使用。选项(“StratingoffSets”、“reasly”)),这是否是预期的Spark streaming行为? > 在开始流作业之前

    • 问题内容: 在纯Java SE 6环境中: Eclipse控制台中未显示任何内容。 l.info(“”) 及以上的作品就好了,但低于任何 罚款 只是似乎没有工作。有什么问题吗?TIA。 问题答案: 即使Logger级别设置为ALL,ConsoleHandler(记录器上的默认Handler)仍然具有INFO的默认级别。这来自 JAVA_HOME / jre / lib中 的默认logging.pr

    • 环境: Eclipse Juno Junit 4.11 maven 3.0.4 问题: 我相信在Eclipse的junit运行器中运行单元测试时遇到了类加载问题。我的具体问题源于这段代码: 上面的代码最终将使用一个类的名称调用,该类位于maven依赖项中(依赖项只是一个java bean的集合)。当我运行mvn测试时,这段代码成功运行,但是,如果我使用eclipse运行junit测试,我会收到一个

    • 问题内容: 我有一个使用Hibernate为HSQL db生成表的应用程序(因为我的应用程序仍在开发中)。在我的域模型中,我已经设定 当我使用DBVisualizer打开数据库时,可以看到所有设置都正确,除了它无法正常工作外,我的列接受的长度超过10个字符的值。当我尝试在DBVisualier中手动运行查询时,它会失败,但是应该进入休眠状态。 同样非常奇怪的是,当我使用文件(而不是在内存db中)以

    • 问题内容: 我正在尝试学习Hibernate的工作方式,并且几乎陷入了无法接受的学习曲线。我看不到如何使Hibernate尊重我的对象的auto_increment策略。而是使用现有ID(从1开始)覆盖数据库中的条目。 我有一个简单的对象,由定义如下的MySQL表支持: 我已经确认使用SQL()手动插入多个Foo对象是正确的。 我的Java类具有使用如下注释来指定的ID: 然后,我执行一些测试代码

    • 我无法使用Gradle强制生成依赖项的版本。我的目标是使用0.20.0版本。发布了Spring HATEOAS库的,但尽管我付出了所有的努力,它仍然解析为0.19.0。释放。 我尝试了许多策略,包括孤立的策略和相互结合的策略。这些策略包括但可能不限于以下策略(请注意,在所有情况下都在文件中定义,该文件位于声明Spring HATEOAS依赖项的模块目录的父目录中): #1(在声明依赖关系的模块的b