当前位置: 首页 > 知识库问答 >
问题:

Spring kafka的消费者在领导者更换后不愿意使用kafka服务器

南门焱
2023-03-14

我使用的是spring-kafka 2.1.10.release。我有一个拥有next属性的消费者(复制了几乎所有的属性):

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka1.local:9093, kafka2.local:9093, kafka3.local:9093]
    check.crcs = true
    client.id = kafkaListener-0
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = kafkaLisneterContainer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    max.poll.interval.ms = 300000
    max.poll.records = 50
    metadata.max.age.ms = 300000
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

我生产的Apache Kafka版本是2.11-1.0.0-0pan4。有一个集群内部有3个kafka的节点:

甚至无法在本地复制。事情是这样的:

4)最神秘:在应用程序中,一切都可以工作。Spring-consumer读取新消息并将它们发送给kafka。我看到了这样的日志。似乎Spring-consumer将其偏移量保存在内存中,并将commit发送给远程kafka(没有错误等):

2019-01-23 14:03:20,975+0000[kafkaLisneterContainer-0-C-1][Fetcher]调试[Consumer Clientid=KafKalisNeter-0,GroupID=KafKalisNeterContainer]分区aaa-1在偏移量164871处Fetch READ_UNCOMMITTED返回了提取数据(Error=None,HighWaterMark=164871,lastStableOffset=-1,logStartOffset=116738,abortedTransactions=null,RecordssizeInBytes=0)2019-01-23 14:03:20,

5)但Apache kafka中迟滞会增加。如果我重新启动应用程序,spring bean consumer将被重新创建,并将释放内存中保存的偏移量。它将从kafka中读取迟滞并第二次处理记录。

拜托,帮忙找钥匙!

共有1个答案

池阳伯
2023-03-14

当您启用自动提交(Kafka的默认值)时,提交完全由Kafka客户机管理,而Spring对其没有控制权。

将其设置为false将允许侦听器容器提交偏移量,默认情况下,该偏移量将在每批记录(轮询结果)之后执行,如果将容器ackmode属性设置为record则在每条记录之后执行。

当分区由于重新平衡而被撤销时,容器还将可靠地提交任何挂起的偏移量。

我通常建议不要使用自动提交。

 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 当我尝试使用Kafka producer and consumer(0.9.0)脚本从一个主题推/拉消息时,我得到以下错误。 为什么我会得到这个错误,我如何解决它? 在Mac上运行Docker容器中的所有组件。ZooKeeper和Kafka分别运行在Docker容器中。 Docker计算机(boot2docker)IP地址:ZooKeeper端口:Kafka端口: 我从kafka server D

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka