当前位置: 首页 > 知识库问答 >
问题:

防止多个活动批处理火花流

拓拔泉
2023-03-14

我正在运行一个spark作业,流上下文每60秒运行一次。问题是一批处理时间太长(由于计算和保存RDD和Parquet到云存储),一批无法在1分钟内完成。它结束于下一批继续进入并成为活动的(状态=处理)。过了一段时间,我有10个活动批处理,而第一个已经完成。结果,它明显减慢,没有一批能够完成。是否存在严格限制一次活动批处理的数量为1。

多谢了。

共有1个答案

鱼安然
2023-03-14

可以使用属性spark.streaming.backpressure.enabled=true禁用背压。这将确保后续批处理处于队列状态,并且当前只有1个活动批处理正在运行。

 类似资料:
  • 我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成: 数据帧()具有以下架构: 例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查、或是否。 第一种方法: 如果<code>struct是否为空,因为在scala中<code>浮点不能为空。 第二种方法: 这种方法,我可以在我的udf中检查是否为空,但我可以检查

  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 当前设置:Spark流作业处理timeseries数据的Kafka主题。大约每秒就有不同传感器的新数据进来。另外,批处理间隔为1秒。通过,有状态数据被计算为一个新流。一旦这个有状态的数据穿过一个treshold,就会生成一个关于Kafka主题的事件。当该值后来降至treshhold以下时,再次触发该主题的事件。 问题:我该如何避免这种情况?最好不要切换框架。在我看来,我正在寻找一个真正的流式(一个

  • 我正在尝试在GCP数据流中运行批处理作业。工作本身有时会占用大量内存。目前,工作一直在崩溃,因为我相信每个工作人员都在试图同时运行pcollection的多个元素。有没有办法防止每个工人一次运行多个元素?

  • 我有一个用例,我必须以FIFO方式处理事件。这些是从机器生成的事件。每台机器每30秒生成一个事件。对于特定的机器,我们需要根据FIFO FASION对事件进行处理。 我们每天需要处理大约2.4亿个事件。对于如此大的规模,我们需要使用Kafka+火花流 从Kafka文档中,我了解到我们可以使用消息的关键字段将消息路由到特定的主题分区。这确保我可以使用机器id作为密钥,并确保来自特定机器的所有消息都进

  • 创建了具有3个分区的主题 创建StreamingContext时将批处理持续时间设置为10秒 以纱线模式运行,有2个执行程序(3个分区共4个内核) 现在我如何测试它是否起作用。 我有一个制作人,一次发送60000条消息到这个主题。当我检查spark UI时,我得到以下信息: