当前位置: 首页 > 知识库问答 >
问题:

水平缩放spring kafka消费者应用程序

龚钧
2023-03-14

我想知道什么是相对于最大水平扩展实例数配置分区数量的好方法。

假设我有一个有6个分区的主题。

我有一个应用程序,它使用的ConnettKafkaListenerContainerFactorysetConcurrency的6.这意味着我将有6个KafkaMessageListenerContainer,每个都使用一个线程,并且均匀地消耗来自所有分区的消息

如果以上是正确的,那么我想知道如果我通过添加另一个实例水平缩放应用程序会发生什么?如果新实例具有相同的配置,并发为6,当然也具有相同的消费者组,我相信第二个实例不会消耗任何消息。因为不会发生重新平衡,因为每个现有的消费者将有一个分区分配给它。

但是,如果我们回到第一个示例,有6个分区,其中一个实例的并发性为3,那么每个使用者线程/KafkaMessageListenerContainer将分配2个分区。如果我们扩展这个应用程序(相同的用户组id和3的并发性),我相信会发生重新平衡,两个实例将分别从3个分区进行消费。

这些假设是否正确,如果不正确,你应该如何处理这种情况?

共有1个答案

夹谷烨赫
2023-03-14

一般来说,对于默认行为,您的假设是正确的,默认行为基于:

/**
 * <p>The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and
 * <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>,
 * <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
 *
 * <p>The assignment will be:
 * <ul>
 * <li><code>C0: [t0p0, t0p1, t1p0, t1p1]</code></li>
 * <li><code>C1: [t0p2, t1p2]</code></li>
 * </ul>
 *
 * Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
 * For the above example, after one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
 * for example <code>C0</code> -&gt; <code>C3</code> <code>C1</code> -&gt; <code>C2</code>.
 *
 * <p>The assignment could be completely shuffled to:
 * <ul>
 * <li><code>C3 (was C0): [t0p2, t1p2] (before was [t0p0, t0p1, t1p0, t1p1])</code>
 * <li><code>C2 (was C1): [t0p0, t0p1, t1p0, t1p1] (before was [t0p2, t1p2])</code>
 * </ul>
 *
 * The assignment change was caused by the change of <code>member.id</code> relative order, and
 * can be avoided by setting the group.instance.id.
 * Consumers will have individual instance ids <code>I1</code>, <code>I2</code>. As long as
 * 1. Number of members remain the same across generation
 * 2. Static members' identities persist across generation
 * 3. Subscription pattern doesn't change for any member
 *
 * <p>The assignment will always be:
 * <ul>
 * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
 * <li><code>I1: [t0p2, t1p2]</code>
 * </ul>
 */
public class RangeAssignor extends AbstractPartitionAssignor {

但是,您可以通过分区插入任何消费者分区赋值器。分配策略消费者财产:https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy

另请参见ConsumerPartitionAssignorJavaDocs,了解更多信息及其实现,以便为您的用例做出选择。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我是一名Ruby/PHP web应用程序开发人员已经有一段时间了,我已经习惯了水平缩放服务器实例以处理更多请求的想法。水平缩放-意味着位于负载均衡器后面的应用程序的独立实例,它们什么都不共享,彼此不知道。 由于websocket有效地保持了浏览器和服务器之间的开放式通信线路,那么PHP/Ruby世界中典型的水平缩放架构是否会导致像链接中所解释的那样的聊天应用程序中断--因为新的websocket连

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我已经用Apache ActiveMQ和一个简单的应用程序创建了一个JMS代理,该应用程序将消息纳入队列OK。 我想创建另一个简单的应用程序,使用MDP异步出列这些消息。以下是我到目前为止所拥有的一个例子: 现在我大概需要一个main方法,但是如果消息到达队列时监听器会异步调用onMessage方法,我不确定如何编写代码: 谢谢你的帮助。