当前位置: 首页 > 知识库问答 >
问题:

使用Spark Streaming时限制Kafka批大小

邢臻
2023-03-14

是否可以限制Kafka消费者返回Spark Streaming的批的大小?

我这么问是因为我得到的第一批记录有上亿条记录,处理和检查它们需要很长时间。

共有1个答案

公孙驰
2023-03-14

我想你的问题可以通过火花流背压来解决。

检查spark.streaming.backpressure.enabledspark.streaming.backpressure.initialrate

默认情况下,spark.streaming.backpressure.initialrate未设置,spark.streaming.backpressure.enabled默认情况下禁用,所以我想spark将尽可能多地使用。

来自Apache Spark Kafka配置

spark.streaming.backpressure.enabled:

这使得火花流能够基于当前批调度延迟和处理时间来控制接收速率,使得系统仅以系统能够处理的速度接收。在内部,这动态设置接收器的最大接收速率。此速率的上限为spark.streaming.receiver.maxratespark.streaming.kafka.maxratePerpartition值(如果已设置)(见下文)。

spark.streaming.backpressure.initialrate:

这是初始最大接收速率,当背压机制启用时,每个接收器将接收第一批数据

当您的Spark工作(分别是Spark workers)能够处理来自kafka的10000条消息,但kafka经纪人给您的工作提供了100000条消息时,这是很好的。

也许您还会有兴趣查看spark.streaming.kafka.maxrateperpartition,以及Jeroen van Wilgenburg在其博客上对这些属性的一些研究和建议。

 类似资料:
  • Kafka大约有5000万张唱片库存(即将消耗)。主题是3个分区。 我的消费应用程序: 我限制了spark streaming的消耗大小,在我的例子中,我将设置为10000,这意味着在我的例子中,它每批消耗30000条记录。 有什么方法可以让spark streaming在每个批处理中提交? Spark streaming日志,证明它每批消耗的记录num:

  • 我用的是Kafka1.0,我增加了批量。大小=100K,用于优化我的制作人性能。但我发现,无论我设定的批次是什么,都没有任何效果。尺寸=100K或1000K或仅1K。此外,我还设定了我的逗留时间。ms=5,但这使性能更差。当我调试Kafka producer的源代码时,如下所示: 我发现了结果的价值。纽巴奇总是正确的,我想这就是为什么这一批。大小没有起任何作用,因为它每次都会唤醒发送者,而不是在b

  • 我正在为Kafka和SparkStreaming编写一些代码,当我将它们放在Yarn-Cluster上时,它报告了。 但它在我的电脑上运行良好(独立模式) 那它有什么问题呢? //这是代码 这里例外----------------------------------- 19/07/26 18:21:56警告Scheduler.TaskSetManager:在stage 0.0中丢失任务0.0(TI

  • partition/data只有15G,kafka日志文件夹是-/data/var/kafka/kafka-logs data/var/kafka/kafka-logs下的大多数文件夹大小为4K-40K 但两个文件夹的大小非常大--5G-7G,这导致/数据是100%

  • 问题内容: 我正在用multer创建一个简单的文件上传系统: 一切正常,文件被上传。唯一不起作用的是最大大小的限制。我这样做是为了检查文件onfileupload start的大小,如果文件过大,它将返回false。但是文件仍然只是上传。 似乎根本没有做任何事情。我尝试了一些东西,但是什么也没有。 我究竟做错了什么?使用multer上载时如何限制文件大小? 问题答案: 新的API 没有。如果要限制

  • 问题内容: 最好使用写入时,我需要将文件大小限制为1 GB 。 是否可以使用或必须使用其他库? 喜欢 问题答案: 您总是可以自己写,以限制写的 字节 数。 下面假定如果超出大小,则要引发异常。 当然,您现在必须手动设置/ 链。