当前位置: 首页 > 知识库问答 >
问题:

Spring-Cloud-stream消费者分区重新分配

仲智
2023-03-14

场景:

  1. 运行从名为“test”的具有10个分区的分区中消耗的Spring Boot项目。分区分配发生在13:00:00
  2. 在~13:00:30使用:
    将分区添加到主题中。/kafka-topics.sh-改变--zooKeer zooKeer: 2181-主题测试-分区100
  3. 在~13:05:30触发分区重新分配。

我运行了几次这些步骤,看起来每5分钟就有一次重新分配

  1. 是否有办法更改重新分配检查操作频率

编辑:

我的用例如下:我们有引导微服务的集成测试。当主题的使用者首先引导时,如果主题不存在并且它创建的分区数等于配置的并发数,则它会创建该主题(例如10)。然后,这个主题的生产者启动,并且他配置的partitonCount(例如20个)大于创建的分区数,因此spring cloud stream添加了丢失的分区,同时消费者分配的分区没有改变,它从前10个分区(1-10)开始一直使用。问题是,生产者正在将消息发布到所有20个分区,因此发送到最后10个分区(11-20)的消息在消费者被分配到新分区之前不会被使用
这种行为会给我们的测试带来问题,我们不能等待5分钟,直到所有分区都分配给使用者。此外,我们不希望提前创建具有所需分区数的主题,我们希望它仍然由spring cloud stream处理。

编辑2:

似乎控制“重新分配”的相关属性是metadata.max.age.ms

以毫秒为单位的一段时间,在这段时间之后,我们强制刷新元数据,即使我们没有看到任何分区领导层的变化,也会主动发现任何新的代理或分区。

共有1个答案

甄德寿
2023-03-14

所以这里有几个问题。

首先,“spring cloud stream”和/或“spring kafka”没有进行任何类型的重新平衡、分区重新分配等。这些都是在kafka内部完成的。Kafka中有一个客户端属性,如果消费者在那么长时间内没有投票,则默认为5分钟(我相信)。无论如何,我会让您访问apache Kafka频道,以获取有关Kafka内部的更多信息。

此外,添加分区、重新分配和重新平衡是代价高昂的操作,在没有认真考虑其影响的情况下,不应尝试。所以,我很想知道您不断添加分区的用例是什么?

 类似资料:
  • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre

  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

  • Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分

  • 我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数

  • 如果我有3个由producer创建的分区,如果我在CF中部署3个实例,每个实例选择一个队列并使用索引处理消息,那么我可以使用cloud stream和rabbit mq开发示例消费者。 现在的问题是,如果我有10个分区,我似乎需要10个实例,这是浪费资源,我们可以让一个消费者监听多个分区吗。我之所以有基于分区的生产者,是因为对我来说,消息序列是处理事务的顺序。

  • 以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置