我已经设置了Spark Structured Streaming(Spark2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark streaming作业启动之前就进入了主题,我无法从主题的开始消费。这是Spark streaming的预期行为吗?它忽略了Spark streaming作业初始运行之前产生的Kafka消息(即使带有.选项(“StratingOffSets”,“Earlish”))?
>
在启动流作业之前,创建test
主题(单个代理,单个分区)并生成指向该主题的消息(在我的示例中为3条html" target="_blank">消息)。
执行下面的spark scala代码。
// Local
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("failOnDataLoss","false")
.option("stratingOffsets","earliest")
.option("subscribe", "test")
.load()
// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
.start()
我希望流从offset=1开始。但是,它从offset=3开始读取。您可以看到,kafka客户端实际上正在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583-[Consumer ClientID=Consumer-2,GroupID=spark-kafka-source-e948eee9-3024-4F14-bcb8-75b80d43cbb1-181544888-driver-0]将分区测试-0的偏移量重置为偏移量3。
我可以看到spark流处理我在启动流作业后生成的消息。
2019-06-18 21:22:57 INFO AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("subscribe", "test")
.load()
df.count // Long = 3
哈哈,这是一个简单的错别字:“stratingoffsets”应该是“startingoffsets”
我已经设置了Spark结构流(Spark 2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark流作业开始之前进入主题,我无法从主题的开始消费。Spark streaming会忽略在初始运行Spark Stream作业之前产生的Kafka消息(即使使用。选项(“StratingoffSets”、“reasly”)),这是否是预期的Spark streaming行为? > 在开始流作业之前
问题内容: 在纯Java SE 6环境中: Eclipse控制台中未显示任何内容。 l.info(“”) 及以上的作品就好了,但低于任何 罚款 只是似乎没有工作。有什么问题吗?TIA。 问题答案: 即使Logger级别设置为ALL,ConsoleHandler(记录器上的默认Handler)仍然具有INFO的默认级别。这来自 JAVA_HOME / jre / lib中 的默认logging.pr
环境: Eclipse Juno Junit 4.11 maven 3.0.4 问题: 我相信在Eclipse的junit运行器中运行单元测试时遇到了类加载问题。我的具体问题源于这段代码: 上面的代码最终将使用一个类的名称调用,该类位于maven依赖项中(依赖项只是一个java bean的集合)。当我运行mvn测试时,这段代码成功运行,但是,如果我使用eclipse运行junit测试,我会收到一个
问题内容: 我有一个使用Hibernate为HSQL db生成表的应用程序(因为我的应用程序仍在开发中)。在我的域模型中,我已经设定 当我使用DBVisualizer打开数据库时,可以看到所有设置都正确,除了它无法正常工作外,我的列接受的长度超过10个字符的值。当我尝试在DBVisualier中手动运行查询时,它会失败,但是应该进入休眠状态。 同样非常奇怪的是,当我使用文件(而不是在内存db中)以
问题内容: 我正在尝试学习Hibernate的工作方式,并且几乎陷入了无法接受的学习曲线。我看不到如何使Hibernate尊重我的对象的auto_increment策略。而是使用现有ID(从1开始)覆盖数据库中的条目。 我有一个简单的对象,由定义如下的MySQL表支持: 我已经确认使用SQL()手动插入多个Foo对象是正确的。 我的Java类具有使用如下注释来指定的ID: 然后,我执行一些测试代码
我无法使用Gradle强制生成依赖项的版本。我的目标是使用0.20.0版本。发布了Spring HATEOAS库的,但尽管我付出了所有的努力,它仍然解析为0.19.0。释放。 我尝试了许多策略,包括孤立的策略和相互结合的策略。这些策略包括但可能不限于以下策略(请注意,在所有情况下都在文件中定义,该文件位于声明Spring HATEOAS依赖项的模块目录的父目录中): #1(在声明依赖关系的模块的b