当前位置: 首页 > 知识库问答 >
问题:

Kafka重新分区(用于基于键的分组)

都阳辉
2023-03-14

当我们基于某个键在流上应用组 by 函数时,kafka 如何计算这一点,因为相同的键可能存在于不同的分区中?我看到了()函数,它基本上对数据进行了重新分区,但我不明白它是什么意思。它会将具有相同键的所有消息移动到单个分区中吗?另外,我们可以通过()方法调用的频率如何?如果有要求,我们可以在收到每条消息后调用它吗?请建议。谢谢

共有1个答案

尹冠宇
2023-03-14

Kafka中的数据(默认情况下)总是按键进行分区。如果您调用< code>groupBy(),分组属性将被设置为消息键,因此,当数据被写入重新分区主题时,具有相同键的所有记录都将被写入同一分区。因此,当读回数据时,可以在< code>aggregate()函数中正确计算聚合。

请注意,Kafka Streams会自动执行此重新分区(包括创建所需主题)。调用repartition()(或through())也可以达到同样的效果,但这不是必需的。

另请注意,Kafka流程序是数据流程序。使用 DSL 时,只需指定数据流程序本身,但尚未处理任何内容。只有当您调用 KafkaStreams#start() 时,才会执行数据流程序。

 类似资料:
  • 我有以下卡桑德拉表格: 每天早上,聚合应用程序应该加载前一天的数据,并从快照列聚合JSON数据。聚合将按地理哈希对数据进行分组,这就是为什么它的部分被选为分区键的一部分。 我知道它是有效的加载数据从Cassandra通过使用join的Cassandratable-但为此我必须得到RDD构造从(created_date,geo_part)对。虽然我知道created_date值,我不能列出geo_p

  • 我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,

  • 关于Kafka,我有以下几个问题: > 如果我创建一个主题,并且指定的分区数多于代理数,那么单个代理将处理多个分区? 如果我创建了一个主题,并且指定的复制因子大于没有代理,那么该主题会创建还是不会创建? 一个代理可以处理不同主题的多个分区。

  • 当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。

  • 我的Kafka Streams应用程序正在使用以下键值布局的kafka主题:

  • 我想知道,在什么情况下,具有相同分区键的消息会进入不同的分区。 我使用下面给出的命令运行了属于同一组的两个消费者在控制台中监听一个主题: 我使用“纳米/Kafka-php”库将消息放入带有键 的主题。当我发送多个这样的消息时,我发现很少有消息转到第二个消费者,而大多数消息都发送给消费者1。 由于我对所有消息使用相同的密钥,因此我希望所有消息都由同一个使用者使用。每个使用者都绑定到每个分区。 我使用