当前位置: 首页 > 知识库问答 >
问题:

当经纪人破产时,Kafka制作人不会抛出异常

晋越彬
2023-03-14

创建了一个群集,其中有两个代理使用相同的动物园管理员,并试图为主题生成消息,其详细信息如下。

当生产者设置acks=“all”或-1时,min.insync。replicas=“2”,它应该接收代理(领导者和副本)的确认,但当一个代理在制作时手动关闭时,即使在acks=“all”有人能解释这种奇怪行为的原因时,对Kafka制作人也没有任何影响?

经纪人在9091,9092。

acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

下面是Kafka制作人的源代码

public static  void main(String k[]) {
    Properties prop=new Properties();
    prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    prop.setProperty(ProducerConfig.ACKS_CONFIG,"all");
    prop.setProperty("min.insync.replicas", "2");
    prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String,String> producer=new KafkaProducer<>(prop);
    ProducerRecord<String,String> rec=new ProducerRecord<String,String>("clust_topic","123");
while(true) {
    producer.send(rec, new Callback() {

        @Override
        public void onCompletion(RecordMetadata rm, Exception arg1) {
            System.out.println(arg1);
            if(arg1!=null)
              System.out.println(arg1);
            else
                System.out.println(rm.topic()+" "+rm.partition()+" "+rm.offset()+" ");      
        }       
    });
  }
}

共有1个答案

东郭赞
2023-03-14

ack=all意味着它需要来自所有同步副本的ack,而不是来自所有副本的ack(请参阅文档)

 类似资料:
  • 感谢你在这个问题上的帮助。 我使用的是Kafka 0.8.2 这是我写的制作人代码。

  • 我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能

  • 我在Windows子系统Linux上安装了kafka,并开始使用命令服务启动,所有服务都已启动。现在,当我尝试从Windows运行我的kafka-spring应用程序时,它显示以下错误:- 无法建立与节点-1(localhost/127.0.0.1:9092)的连接。经纪人可能不可用。 我的服务器属性是:- 我哪里出错了???

  • 我正在尝试仅为代理间kerberos配置Kafka代理。然而,由于它似乎也想通过Kerberos连接到Zookeeper,所以我似乎总是遇到错误。我目前还没有设置任何Zookeeper键。 我的Kafka代理 JAAS 配置如下: 服务器属性 我用上述配置得到的错误如下: 换句话说,我只想要经纪人到经纪人的 kerberos 和经纪人 - 动物园管理员的普通SASL_SSL。这可能吗?

  • 我知道生产者/消费者需要与经纪人交谈以了解分区的领导者。经纪人与zk交谈以告诉他们加入了集群。 是真的吗 经纪人从zk知道谁是给定分区的负责人 zk发现经纪人离开/死亡。然后重新选举领导人,并向所有经纪人发送新的领导人信息 问题: 为什么我们需要经纪人相互沟通?这只是为了让tehy可以移动分区,或者他们也可以互相查询元数据。如果是这样,元数据交换的例子是什么

  • 我有两个< code>kafka 0.10.1的代理集群,之前在我的开发服务器上正确运行< code>zookeeper 3.3.6。 我最近尝试将broker版本升级到最新的,但没有开始。配置没有太大变化 谁能告诉我可能会出什么问题吗。为什么经纪人没有起步? 已更改服务器。代理服务器1上的属性 已更改代理服务器2上的server.properties 注意: 1.Zookeeper正在两台服务器