当前位置: 首页 > 知识库问答 >
问题:

连续触发器不适用于具有有效接收器的 Spark 结构化流式处理

归翔
2023-03-14

我正在尝试将连续触发器与 Spark 结构化流式处理查询结合使用。我得到的错误是,火花消费者在处理数据时找不到适当的偏移量。如果没有此触发器,查询将正常运行(如预期)。

我的工作:

从Kafka主题读取数据:

val inputStream: DataStreamReader = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "input_topic")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")

val inputDF: DataFrame = inputStream.load()

数据写入Kafka主题:

val outputStream: DataStreamWriter[Row] = inputDF
      .writeStream
      .trigger(Trigger.Continuous("1 second"))
      .queryName("some_name_fresh_each_run")
      .option("checkpointLocation", "path_to_dir_fresh_each_run")
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("failOnDataLoss", value = false)
      .option("startingOffsets", "latest")
      .option("topic", "output_topic")

val streamingQuery = outputStream.start()

所以我基本上没有做什么特别的事情——只是将输入数据传输到输出主题,而没有任何转换或无效操作。

我得到了什么:

在executor日志中,我看到很多这样的消息:

21/10/06 14:16:55 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894

21/10/06 14:16:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-1, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] Resetting offset for partition input_topic-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

21/10/06 14:16:55 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-2, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] Seeking to offset 0 for partition input_topic-11
2
1/10/06 14:16:55 WARN KafkaDataConsumer: Some data may be lost. Recovering from the earliest offset: 0

21/10/06 14:16:55 WARN KafkaDataConsumer: 
The current available offset range is AvailableOffsetRange(0,0).
 Offset 0 is out of range, and records in [0, 9223372036854775807) will be
 skipped (GroupId: spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor, TopicPartition: input_topic-7). 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true"

尽管有最新的偏移设置,但仍然有这样的消息:

21/10/06 14:16:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor-3, groupId=spark-kafka-source-1a733ff4-c76c-488d-86b2-6829b29e789d--1476943634-executor] 
Seeking to EARLIEST offset of partition input_topic-5

我知道,当 Kafka 主题出现问题或其偏移量中断或数据真正丢失时,通常会出现 failOnData丢失错误。但是我删除了本地 Kafka 代理中的所有数据,从头开始重新创建所有使用的主题(input_topicoutput_topic),检查其中根本没有数据,错误仍然出现。

我尝试了什么(以不同的顺序尝试了很多次):

  1. 更改/删除流式处理查询的检查点位置。
  2. 完全干净的Kafka主题,甚至整个经纪人数据。
  3. 完全删除所有消费组。
  4. 通过运行以下命令,手动将偏移量重置为新创建的主题中的最新偏移量:
bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group <copied_group_name_from_logs> \
  --reset-offsets --execute --to-latest --topic input_topic

无济于事:(最令人困惑的是主题是空的,没有数据,唯一可用的偏移量是0。

我做什么作为解决方法

不要使用连续触发:(

使用相同的设置、相同的主题、相同的查询、相同的Spark群集……等等,我在没有.trigger(trigger.Continuous(“1秒”))的情况下运行我的应用程序(只是没有触发器,这意味着Spark将使用默认触发器),一切都像一个魔咒一样运行。

为什么我需要这个:

好吧,我想在使用Spark处理流数据时实现约1ms的有希望的延迟。

环境:

  • 火花版本: 3.0.1
  • 斯卡拉版本: 2.12.10
  • Kafka版本(本地运行):2.7.0
  • 主题中的分区数: 20

我知道这是实验性的功能,但我真的希望它能像promise的那样被使用。请帮帮忙!

共有1个答案

周和志
2023-03-14

问题是,您在本地计算机上运行这个示例时,会得到警告:

21/10/06 14:16:55 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894

为了连续处理数据,需要 KafkaData 消费者可以在不间断线程中运行。在警告下,它说:

“连续处理引擎启动多个长时间运行的任务,这些任务不断从源读取数据,处理数据并不断写入接收器。查询所需的任务数取决于查询可以从源并行读取的分区数。因此,在开始连续处理查询之前,必须确保集群中有足够的内核来并行执行所有任务。例如,如果您正在读取具有 10 个分区的 Kafka 主题,则群集必须至少具有 10 个核心才能使查询取得进展。

显然,您需要一个合适的集群来测试这个实验性功能

注意,我在本地测试了相同的设置,源和宿主题只有一个分区,但我仍然得到KafkaDataConsumer没有在UninterruptableThread中运行警告,尽管我的CPU中有6个内核。显然,这也不起作用,所以可能只有集群才能帮助解决问题。

 类似资料:
  • 我正在使用带更新模式的结构化流媒体读取Kafka主题中的数据流。,然后做一些改变。 然后我创建了一个jdbc接收器,用追加模式在mysql接收器中推送数据。问题是我如何告诉我的接收器让它知道这是我的主键,并基于它进行更新,这样我的表就不会有任何重复的行。

  • 我正在尝试使用Spark结构化流的功能,触发一次,来模拟一个类似的批处理设置。然而,当我运行我的初始批处理时,我遇到了一些麻烦,因为我有很多历史数据,因此我也使用了这个选项。选项(" cloud files . includeexistingfiles "," true ")也处理现有文件。 因此,我的初始批处理变得非常大,因为我无法控制批处理的文件量。 我也尝试过使用选项 cloudFiles.

  • 使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息: 但是出现错误: Jsonlint说Json是有效的。我在 kafka 配置中保留了 json 。有什么指示吗?

  • 场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?

  • 我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?

  • 我的EJBTest有问题。 我安装了WildFly并配置了用户管理和应用程序管理。 我编写了一个EJB 3.0并进行了部署: 之后,我编写了一个简单的客户端来连接它: 用户名和密码都是应用程序用户凭据,而不是管理!对吗? 我收到以下错误: 线程“main”java中出现异常。lang.IllegalStateException:EJBClient00025:没有可用于处理调用上下文组织的[appN