当前位置: 首页 > 知识库问答 >
问题:

火花流句柄 倾斜的Kafka分区

程冥夜
2023-03-14

场景:
Kafka-

每个火花流微批次中的逻辑(30秒):< br >读取Json-

我的流媒体工作是阅读大约1000个Kafka主题,大约有10K个Kafkapartitions,吞吐量大约为500万事件/秒。

问题来自 Kafka 分区之间的流量负载不均匀,一些分区的吞吐量大约是较小分区的 50 倍,这会导致 RDD 分区倾斜(因为 KafkaUtils 创建了从 Kafka 分区到 Spark 分区的 1:1 映射)并真正损害了整体性能,因为对于每个微批处理,大多数执行器都在等待负载最大的执行器完成, 我通过查看 Spark UI 知道这一点,在每个微批处理的某个点,只有少数执行程序具有“活动”任务,所有其他执行程序都完成了他们的任务并等待,也通过查看任务时间分布,MAX 是 2.5 分钟,但中位数只有 20 秒。

注意事项:

    < li>Spark流而非结构化流 < li >我知道这个post Spark-repartition()vs coalesce(),我不是问re partition()或coalesce()之间的区别,负载是一致的,因此与自动缩放或动态分配也无关

我尝试过:

  1. Coalesce() 有一点帮助,但不能消除偏度,有时甚至更糟的是,在执行器上也给 OOM 带来了更高的风险。
  2. Repartition() 确实消除了偏度,但在这个规模上完全洗牌太昂贵了,惩罚不会回报每个批次的执行时间,增加批处理时间也不起作用,因为当批处理时间增加时,每个微批处理的负载增加,洗牌的工作量也会增加

如何使工作负载更均匀地分布在Spark执行器之间,以便更有效地使用资源?性能会更好吗?

共有2个答案

云隐水
2023-03-14

实际上你已经给出了自己的答案。

不要从1000个主题中读取1个流式作业。将负载最大的任务放入单独的流式作业中。重新配置,就这么简单。负载平衡,排队理论。

落伍者在Spark中是一个问题,尽管落伍者在Spark中具有略有不同的特征。

宇文鸿畴
2023-03-14

我有同样的问题。您可以尝试 Spark 2.4.7 中的 minPartitoin 参数

有几件重要的事情需要强调。

  • 默认情况下,一个 Kafka 分区映射到 1 个 Spark 分区,或者几个从 Spark 映射到 Kafka 中的一个分区。
  • Kafka 数据帧具有每个分区的开始和结束边界。
  • Kafka Dataframe maxMessagePerTrigger 定义从 kafka 读取的多个消息。
  • 从Spark 2.4.7开始,还支持minParrtions参数,可以根据偏移量范围将一个Kafka分区绑定到多个Kafka分区。默认情况下,它会尽最大努力均匀地拆分 Kafka 分区(偏移范围)。

因此,使用< code > min partitions 和< code > maxoffsetsperrigger 可以预先计算出大量分区。

.option("minPartitions", partitionsNumberLoadedFromKafkaAdminAPI * splitPartitionFactor)
.option("maxOffsetsPerTrigger", maxEventsPerPartition * partitionsNumber)

html" target="_blank">配置中定义的< code > maxEventsPerPartition 和< code > splitPartitionFactor 。

在我的例子中,有时我有数据峰值,我的消息大小可能非常不同。所以我实现了我自己的流源代码,它可以按精确的记录大小拆分kafka分区,甚至在一个火花上合并几个kafka部分。

 类似资料:
  • 我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,

  • 我想知道Kafka分区是如何在从executor进程内部运行的SimpleConsumer之间共享的。我知道高水平的Kafka消费者是如何在消费者群体中的不同消费者之间分享利益的。但是,当Spark使用简单消费者时,这是如何发生的呢?跨计算机的流作业将有多个执行程序。

  • 为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 问题内容: 有谁知道如何用scipy绘制偏态正态分布?我认为可以使用stats.norm类,但我不知道如何使用。此外,如何估计描述一维数据集偏斜正态分布的参数? 问题答案: 根据Wikipedia的描述, 如果你想找到一个数据集的使用规模,位置和形状参数,例如使用,并且, 应该给你类似的东西,

  • 我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?