当前位置: 首页 > 知识库问答 >
问题:

当主Kafka代理死亡时,ISR不会扩展以维护复制

太叔景曜
2023-03-14

我正在测试Kafka的弹性(apache;Kafka2.12-1.1.0)。我所期望的是,当某个节点崩溃时,某个主题的ISR应该增加它自身(即复制到可用节点)。我花了4天时间在谷歌上搜索可能的解决方案,但毫无用处。

拥有3个节点集群,并使用docker(wurstmeister)在服务器中更新了以下内容,在集群上创建了3个代理、3个动物园管理员(1node=1broker 1 zookeeper)。性质

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
default.replication.factor=3

启动所有经纪人;等了一会儿;已使用replication3创建主题,同步复制2的分钟数

bin/kafka-topics.sh --create --zookeeper 172.31.31.142:2181,172.31.26.102:2181,172.31.17.252:2181  --config 'min.insync.replicas=2' --replication-factor 3 --partitions 1 --topic test2

当我描述这个主题时,我看到了下面的数据

bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2     PartitionCount:1        ReplicationFactor:3     Configs:min.insync.replicas=2
        Topic: test2    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

到目前为止一切顺利,现在我开始讨论;紧随其后的是制片人。当消费处于全速时,我杀了经纪人。现在,当我描述同一主题时,我看到了以下内容([Edit-1])

bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2     PartitionCount:1        ReplicationFactor:3     Configs:min.insync.replicas=2
        Topic: test2    Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3,1

bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic __consumer_offsets
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets       Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,3
        Topic: __consumer_offsets       Partition: 1    Leader: 3       Replicas: 2,3,1 Isr: 1,3
    .. .. .. 

[编辑结束-1]

我让Kafka制作人,消费者继续几分钟;问题1:当代理2关闭时,为什么副本仍然显示2?

现在,我又向集群添加了2个代理。生产商、消费者继续关注ISR;ISR复制副本的数量不会增加,仅为3,1。问题2:为什么ISR没有增加,即使还有两个经纪人?。

然后我停止了生产者,消费者;等了几分钟;再次运行descripe命令——结果仍然相同。ISR何时扩展其复制?。如果还有2个节点可用,为什么ISR不复制?

我对我的制作人的评价如下

props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", new Integer(args[2]));// 60384
props.put("linger.ms", new Integer(args[3]));// 1
props.put("buffer.memory", args[4]);// 33554432
props.put("bootstrap.servers", args[6]);// host:port,host:port,host:port etc
props.put("max.request.size", "10485760");// 1048576

及消费者资料如下:

props.put("group.id", "testgroup");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", args[2]);// 1000
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put("max.partition.fetch.bytes", args[3]);// 52428800
    props.put("fetch.max.bytes", args[4]);// 1048576
    props.put("fetch.message.max.bytes", args[5]);// 1048576
    props.put("bootstrap.servers", args[6]);
    props.put("max.poll.records", args[7]);
    props.put("max.poll.interval.ms", "30000");
    props.put("auto.offset.reset", "latest");

在另一个实验中,当我删除另一个代理时,我开始看到同步复制的总数小于所需的最小值的错误。令人惊讶的是,在这个州,生产商没有被封锁;但是我在代理服务器上看到了错误。日志没有新邮件进入队列。问题4:生产商不应该被封锁吗?而不是在代理端抛出错误?还是我的理解错了?

需要帮忙吗?

共有2个答案

戴嘉珍
2023-03-14

复述副本的含义:所有分区副本都是副本,甚至是领导者的副本;换句话说,2个副本意味着你有领导者和一个追随者。

当您描述主题时,对于您唯一的分区,您会看到:“副本:2,3,1 Isr:3,1”,这意味着在创建主题时,前导分区被分配给代理2(副本列表中的第一个),追随者被分配给代理3和1;现在,代理2是该分区的“首选领导者”。

此任务本身不会更改(领导者可能会更改,但“首选领导者”不会更改),因此您的追随者不会移动到其他代理,只能将领导者角色分配给另一个同步副本。(有一个属性auto.leader.REBANCE.enable,如果该属性设置为true,将允许领导者角色在再次启动时返回到首选领导者,否则领导者角色将由新当选的领导者保留。。。

下一次尝试杀死领导者经纪人,你会看到一个新的领导者将被选举和使用,但“副本: 2,3,1”将留下来。

如果您设置复制因子=3 acks=all和min.insync.replicas=2,您可以产生只要2个副本确认写入(领导者和一个追随者),但如果不可能维护3个ISR,则会在代理上获取日志。...

希望这有助于。。。

赫连子石
2023-03-14

如果我理解正确的话,Kafka不会在添加代理时自动重新平衡。除非使用重新分区工具,否则不会重新分配关闭的复制副本

目前还不清楚您的环境之间有什么区别,但看起来您并没有真正杀死一个仍被列为领导者的经纪人。

如果您有两个代理,最小ISR为2,那么,是的,您将看到错误。但是,制作人应该仍然能够联系到至少一个代理,因此我认为它不会被完全阻止,除非您将ack值设置为all。代理端的错误更多地与放置副本有关

 类似资料:
  • 因此,我是Spring集成的新手,主要也是Spring的新手,所以我可能不熟悉所有术语,但我遇到了以下场景: 我有一个带有三个SI流的小型Spring集成应用程序。。。每个流都有自己的网关,每个网关都有自己的请求通道和应答通道。这些流接收一个空调用(用于所有密集用途…基本上只是一个“GO”信号/空消息),并根据(琐碎的)业务逻辑结果用状态消息进行回复。 现在,我想将这些流连接在一起,在一个“主流”

  • 我的Java应用程序生成了一个填充JasperReports报表的pdf文件,但当我打开pdf文件时,它的字体总是“Arial”,在报表中设置为“DeJavu Sans”。 当我使用JasperReports函数导出到pdf文件时,一切正常(之后,我导入了DejaVu Sans的.ttf文件)。只有当我使用Java应用程序时,问题才会持续存在。 有人有同样的问题吗?

  • 我们已经实现了延迟消息处理,有2个队列和x-死信-交换/x-消息-ttl,在queue1中的消息超时后,它将转到queue2。 现在是否有可能设置RabbitMQ,以便如果在处理来自queue2的消息时,我们将其拒绝为“死信”,那么它将自动转到queue3?我担心的是queue2中的消息已经标记为“已死”,有没有办法区分那些因为被拒绝而死的消息,并自动只将那些放在队列3中?

  • 问题内容: 我有一个程序生成并与CPU繁重,不稳定的进程通信,而不是由我创建的。如果我的应用程序崩溃或被杀死,我也希望子进程也被杀死,因此用户不必跟踪它们并手动杀死它们。 我知道以前已经讨论过该主题,但是我已经尝试了所有描述的方法,但似乎没有一种方法能够经受住测试的考验。 我知道这是有可能的,因为终端一直在这样做。如果我在终端中运行某些程序并杀死该终端,则这些东西总是会死掉。 我试过了,双叉和。不

  • 因此,我有一个主线程,它产生了一堆“工作线程”,在整个过程中与它一起工作。我想要的是,如果一个工作线程死于异常或其他什么,主线程也应该抛出一个运行时异常并平静地死掉。 我想要的是,如果一个工作线程死于异常或其他什么,主线程也应该抛出一个运行时异常并平静地死掉,而不使用标志,但要“自动”完成

  • 扩展说明 将 Invoker 接口转换成业务接口。 扩展接口 org.apache.dubbo.rpc.ProxyFactory 扩展配置 <dubbo:protocol proxy="xxx" /> <!-- 缺省值配置,当<dubbo:protocol>没有配置proxy属性时,使用此配置 --> <dubbo:provider proxy="xxx" /> 已知扩展 org.apache.