你可以阅读这个Kafka文档http://kafka.apache.org/documentation/#impl_brokerregistration消费者注册算法和消费者再平衡算法
正如它所说,每个消费者在重新平衡过程中会做以下事情:
1. For each topic T that C<sub>i</sub> subscribes to
2. let P<sub>T</sub> be all partitions producing topic T
3. let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T
4. sort P<sub>T</sub> (so partitions on the same broker are clustered together)
5. sort C<sub>G</sub>
6. let i be the index position of C<sub>i</sub> in C<sub>G</sub> and let N = size(P<sub>T</sub>)/size(C<sub>G</sub>)
7. assign partitions from i*N to (i+1)*N - 1 to consumer C<sub>i</sub>
8. remove current entries owned by C<sub>i</sub> from the partition owner registry
9. add newly assigned partitions to the partition owner registry
(we may need to re-try this until the original partition owner releases its ownership)
并请注意:
如果消费者比分区多,一些消费者将根本得不到任何数据。在重新平衡过程中,我们尝试将分区分配给使用者,以减少每个使用者必须连接的代理节点的数量。
好的,现在有两种重新平衡算法-Range
和RoundRobin
。它们也被称为分区分配策略。
为了简单起见,假设我们有一个带有10个分区的主题T1
,我们还有两个具有不同配置的消费者(为了更清楚起见)C1
,其中num.streams
设置为1
,而C2
中num.streams
设置为2
。
以下是如何使用范围
策略:
Range按数字顺序排列可用分区,按字典顺序排列消费线程。所以在我们的例子中,分区的顺序将是
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
,消费线程的顺序将是C1-0,C2-0,C2-1
。然后分区的数量除以消费线程的数量,以确定每个消费线程应该拥有多少个分区。在我们的例子中,它不平均划分,所以线程C1-0
将获得一个额外的分区。最终的分区分配如下:
C1-0
获取分区0,1,2,3
C2-0
获取分区4,5,6
C2-1
获取分区7,8,9
如果有11个分区,那么这些使用者的分区分配将发生一些变化:
C1-0
将获得分区0,1,2,3
C2-0
将获得分区4,5,6,7
C2-1
将获得分区8,9,10
就这样。
策略相比,这里的一个主要区别是,在重新平衡之前,你无法预测任务将是什么。下面是这将如何与相同的配置不适用于
范围RoundRobin
策略,因为它要求订阅此主题的所有消费者的num.streams
相等,所以假设两个消费者现在都将num.streams
设置为2。与循环策略一起工作:
首先,实际派遣前必须满足两个条件:
a) 每个主题在一个消费者实例中都有相同数量的流(这就是我在上面提到的每个消费者不同数量的线程将不起作用的原因)
b)组中每个消费者实例的订阅主题集都是相同的(我们这里有一个主题,所以现在这不是问题)。
当验证这两个条件时,
主题分区
对按hashcode排序,以减少将一个主题的所有分区分配给一个消费者的可能性(如果要消费的主题不止一个)。
最后,所有
主题分区
对都以循环方式分配给可用的使用者线程。例如,如果我们的主题分区最终将按如下方式排序:T1-5、T1-3、T1-0、T1-8、T1-2、T1-1、T1-4、T1-7、T1-6、T1-9
和消费者线程是C1-0、C1-1、C2-0、C2-1
,则分配如下:
转到T1-5
转到C1-0
T1-3
转到C1-1
T1-0
转到C2-0
T1-8
转到C2-1
此时已经没有更多的消费线程了,但是仍然有更多的主题分区,因此对消费线程的迭代将重新开始:T1-2
转到C1-0
T1-1
转到C1-1
T1-4
转到C2-0
<代码>T1-7C2-1
再一次:T1-6
转到C1-0
T1-9
转到C1-1
此时,所有主题分区都已分配,每个使用者线程的分区数几乎相等。
希望这有帮助。
我有一个Kafka流应用程序,它从几个主题中获取数据,并将数据加入另一个主题。 Kafka配置: 注意:我在运行Kafka Brokers的机器上运行Kafka Streams应用程序。 每小时消耗/产生数百万条记录。每当我让Kafka经纪人倒下时,都会进入再平衡阶段,再平衡大约需要30分钟,有时甚至更长时间。 有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候,它在重新平衡时抛出异常
我们正在运行一个3 broker Kafka 0.10.0.1集群。我们有一个java应用程序,它产生了许多消费线程,从不同的主题消费。对于每一个主题,我们都指定了不同的消费者群体。 很多时候,我看到每当这个应用程序重新启动时,一个或多个CG需要超过5分钟来接收分区分配。在此之前,这个话题的消费者不会消费任何东西。如果我去Kafka broker并运行Consumer-Groups.sh并描述特定
在消费者重新平衡期间如何确保消息排序。假设最初我们有四个分区:p1、p2、p3、p4和两个消费者c1和c2(在同一组中)。因此每个消费者得到两个分区,例如c1 : p1,p2和c2 : p3,p4。 现在添加了新的消费者,比如c3和c4,重新平衡发生,这样每个消费者都有一个分区,比如c1: p1、c2: p2、c3: p3、c4: p4。 在此期间,消费者c1可能正在处理来自分区p2的消息(在重新
当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了
当一个新的消费者/borker被添加或下降时,Kafka会触发一个再平衡操作。Kafka是在重新平衡封锁行动。Kafka的消费者是不是在一个再平衡操作正在进行的时候就被封锁了?
我对再平衡有些怀疑。现在,我正在手动将分区分配给使用者。因此,根据文件,如果消费者离开/崩溃在一个消费群体中,就不会有再平衡。 假设同一组中有3个分区和3个使用者,每个分区都是手动分配给每个使用者的。一段时间后,第三个消费者倒下了。既然没有再平衡,我可以采取什么措施来确保停机时间最小化?我是否需要更改前两个分区中任何一个的配置,以从第三个分区或其他分区开始使用?