当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka多并发使用者-提交偏移量失败

东方栋
2023-03-14

我正在使用spring with Kafka来消费来自Kafka主题的数据。我已经将并发配置为10。因此不同的线程轮询代理以获取消息并处理消息。即使在一段时间后(成功处理),我们也会收到相同的消息返回给使用者的不同线程。我们能够在配置的max.poll.interval.ms=1500000内处理接收到的消息。

请找到以下配置的Kafka消费者属性。我已经通过Kafka配置了自动提交。

    group.id=ips-request-group //group id
    enable.auto.commit=true // auto commit
    auto.commit.interval.ms=60000 // auto commit inverval
    session.timeout.ms=100000 // session time out
    request.timeout.ms=610000 // request time out
    fetch.max.wait.ms=500  //polling time out
    heartbeat.interval.ms=20000 // heart beat interval
    auto.offset.reset=latest  //consuming latest messages.
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer // key
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer //value
    max.poll.records=10 // max polling records
    max.poll.interval.ms=1500000 // max polling interval ms /*

你能帮我解决Kafka消费者重复接收消息的问题吗。

共有1个答案

鲁望
2023-03-14

在执行每个记录后提交偏移量;设置auto.commit.enabled=false并将容器属性ackmode设置为ackmode.record

 类似资料:
  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有一个关于Kafka自动提交机制的问题。我正在使用启用自动提交的Spring-Kafka。作为一个实验,我在系统空闲(主题中没有新消息,没有正在处理的消息)的情况下,将我的消费者与Kafka的连接断开了30秒。重新连接后,我收到了如下几条消息: 第一,我不明白有什么好犯的?系统空闲(所有以前的消息都已提交)。第二,断开时间为30秒,比max.poll.interval.ms的5分钟(300000

  • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }

  • 我正在使用来探讨一个Kafka主题。 断断续续地,我会收到以下错误消息,然后是2个警告: 它在警告日志中建议: 这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。 因此,我需要增加或减少。

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用