当前位置: 首页 > 知识库问答 >
问题:

spark streaming中的Kafka限制数据消耗

郎恺
2023-03-14

我正在做星火流媒体项目。从Kafka那里得到数据。我想限制Spark-Streaming消耗的记录。关于Kafka的资料非常多。我已经使用spark.streaming.kafka.MaxRatePerPartition=1属性来限制Spark中的记录。但在5分钟的批处理中,我收到了13400条消息。我的星火程序每5分钟不能处理超过1000条消息。Kafka主题有三个分区。我的spark驱动程序内存是5GB,有3个执行器,每个3GB。如何限制Kafka在spark Streaming中的消息消耗。

共有1个答案

卫才哲
2023-03-14

你试过在道具下面设置吗?

spark.streaming.backpressure.enabled
spark.streaming.backpressure.initialRate
 类似资料:
  • 问题内容: 我正在通过migrations.changeColumn函数在迁移中添加一个约束。 添加约束是可行的,但是由于您需要提供Possibly unhandled SequelizeDatabaseError: relation “myAttribute_unique_idx” already exists`。 (使用的数据库是postgres) 我也尝试过使用removeIndex 但是在

  • 我正在为Kafka和SparkStreaming编写一些代码,当我将它们放在Yarn-Cluster上时,它报告了。 但它在我的电脑上运行良好(独立模式) 那它有什么问题呢? //这是代码 这里例外----------------------------------- 19/07/26 18:21:56警告Scheduler.TaskSetManager:在stage 0.0中丢失任务0.0(TI

  • 我刚开始使用Spark streaming并尝试运行本教程中的一个示例,我正在跟踪制作并运行我们自己的NetworkWordCount。我已经完成了第8步,并从SBT制作了一个罐子。 现在我正在尝试使用第9步中的命令运行deploy my jar,如下所示: 我创建的jar包含“NetworkWordCount”类,该类具有来自spark示例的以下代码 我无法确定我做错了什么。

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • Sphinx/Coreseek索引的源数据有一些限制,其中最重要的一条是: 所有文档的ID必须是唯一的无符号非零整数(根据Sphinx构造时的选项,可能是32位或64位) 如果不满足这个要求,各种糟糕的情况都可能发生。例如,Sphinx/Coreseek建立索引时可能在突然崩溃,或者由于冲突的文档ID而在索引结果中产生奇怪的结果。也可能,一只重达1000磅的大猩猩最后跳出你的电脑,向你扔臭蛋。我告

  • 但是Flink医生说: 在启用Flink检查点的情况下,Flink Kafka使用者将使用来自主题的记录,并以一致的方式定期检查其所有的Kafka偏移量以及其他操作的状态。在作业失败的情况下,Flink会将流程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新使用来自Kafka的记录。 阅读其他来源,我猜Flink检查点将保存程序的状态以及消耗的偏移量,但Spark检查点只是保存消耗的偏移