我已经设置了Spark结构流(Spark 2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark流作业开始之前进入主题,我无法从主题的开始消费。Spark streaming会忽略在初始运行Spark Stream作业之前产生的Kafka消息(即使使用。选项(“StratingoffSets”、“reasly”)),这是否是预期的Spark streaming行为?
>
在开始流作业之前,创建test
主题(单个代理、单个分区),并为该主题生成消息(在我的示例中为3条消息)。
// Local
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("failOnDataLoss","false")
.option("stratingOffsets","earliest")
.option("subscribe", "test")
.load()
// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
.start()
我希望流从offset=1开始。但是,它从offset=3开始读取。您可以看到kafka客户机实际上正在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583-[Consumer clientid=consumer-2,groupid=spark-kafka-source-e948ee9-3024-4f14-bcb8-75b80d43cbb1-181544888-driver-0]将分区test-0的偏移量重置为偏移量3。
我可以看到spark stream处理我在启动流作业后生成的消息。
Spark streaming会忽略在初始运行Spark Stream作业之前生成的Kafka消息(即使使用.option(“StratingoffSets”,“reasial”)
),这是否是预期的Spark streaming行为?
2019-06-18 21:22:57 INFO AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
我能够确认批处理模式从一开始就读取--所以Kafka保留配置没有问题
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("subscribe", "test")
.load()
df.count // Long = 3
哈哈,这是一个简单的错别字:“stratingoffsets”应该是“startingoffsets”
我已经设置了Spark Structured Streaming(Spark2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark streaming作业启动之前就进入了主题,我无法从主题的开始消费。这是Spark streaming的预期行为吗?它忽略了Spark streaming作业初始运行之前产生的Kafka消息(即使带有.选项(“StratingOffSets”,“Earlis
我已经将kafka代理从0.8升级到0.11,现在我正在尝试升级火花流作业代码以与新的kafka兼容-我正在使用火花1.6.2-。 我搜索了很多步骤来执行此升级,我没有找到任何官方或非官方的文章。 我发现唯一有用的文章是这篇,但是它提到了spark 2.2和kafka 0.10,但是我得到一行文字说 但是,由于较新的集成使用新的 Kafka 使用者 API 而不是简单的 API,因此在用法上存在显
我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
我正在使用Spark结构化流媒体阅读Kafka主题。 我错过什么了吗?