当前位置: 首页 > 知识库问答 >
问题:

带有Kafka的Spark结构流不尊重startingoffset=“最早”

濮阳茂材
2023-03-14

我已经设置了Spark结构流(Spark 2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark流作业开始之前进入主题,我无法从主题的开始消费。Spark streaming会忽略在初始运行Spark Stream作业之前产生的Kafka消息(即使使用。选项(“StratingoffSets”、“reasly”)),这是否是预期的Spark streaming行为?

>

  • 在开始流作业之前,创建test主题(单个代理、单个分区),并为该主题生成消息(在我的示例中为3条消息)。

    // Local
    val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9097")
      .option("failOnDataLoss","false")
      .option("stratingOffsets","earliest")
      .option("subscribe", "test")
      .load()
    
    // Sink Console
    val ds = df.writeStream.format("console").queryName("Write to console")
      .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
      .start()
    

    我希望流从offset=1开始。但是,它从offset=3开始读取。您可以看到kafka客户机实际上正在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583-[Consumer clientid=consumer-2,groupid=spark-kafka-source-e948ee9-3024-4f14-bcb8-75b80d43cbb1-181544888-driver-0]将分区test-0的偏移量重置为偏移量3。

    我可以看到spark stream处理我在启动流作业后生成的消息。

    Spark streaming会忽略在初始运行Spark Stream作业之前生成的Kafka消息(即使使用.option(“StratingoffSets”,“reasial”)),这是否是预期的Spark streaming行为?

    2019-06-18 21:22:57 INFO  AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
    2019-06-18 21:22:57 INFO  AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
    2019-06-18 21:22:57 INFO  MicroBatchExecution:54 - Starting new streaming query.
    2019-06-18 21:22:57 INFO  Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
    2019-06-18 21:22:57 INFO  AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
    2019-06-18 21:22:57 INFO  ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
    2019-06-18 21:22:57 INFO  AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
    2019-06-18 21:22:57 INFO  AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
    2019-06-18 21:22:57 INFO  ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
    2019-06-18 21:22:57 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    2019-06-18 21:22:58 INFO  KafkaSource:54 - Initial offsets: {"test":{"0":3}}
    2019-06-18 21:22:58 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    2019-06-18 21:22:58 INFO  MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
    2019-06-18 21:22:58 INFO  KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
    

    我能够确认批处理模式从一开始就读取--所以Kafka保留配置没有问题

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9097")
      .option("subscribe", "test")
      .load()
    
    df.count // Long = 3
    
  • 共有1个答案

    马浩淼
    2023-03-14

    哈哈,这是一个简单的错别字:“stratingoffsets”应该是“startingoffsets”

     类似资料:
    • 我已经设置了Spark Structured Streaming(Spark2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark streaming作业启动之前就进入了主题,我无法从主题的开始消费。这是Spark streaming的预期行为吗?它忽略了Spark streaming作业初始运行之前产生的Kafka消息(即使带有.选项(“StratingOffSets”,“Earlis

    • 我已经将kafka代理从0.8升级到0.11,现在我正在尝试升级火花流作业代码以与新的kafka兼容-我正在使用火花1.6.2-。 我搜索了很多步骤来执行此升级,我没有找到任何官方或非官方的文章。 我发现唯一有用的文章是这篇,但是它提到了spark 2.2和kafka 0.10,但是我得到一行文字说 但是,由于较新的集成使用新的 Kafka 使用者 API 而不是简单的 API,因此在用法上存在显

    • 我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep

    • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

    • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

    • 我正在使用Spark结构化流媒体阅读Kafka主题。 我错过什么了吗?