当前位置: 首页 > 知识库问答 >
问题:

发生故障时Spring Kafka自动提交偏移量

景令秋
2023-03-14

我使用的是Spring Kafka 1.2.2版。我有一个Kafka Listener作为消费者,它监听一个主题并在弹性中索引文档。我的自动提交偏移量属性设置为true//default。

共有1个答案

司寇望
2023-03-14

我更愿意将其设置为false;由容器为您管理偏移量更可靠。

将容器的ackmode设置为record(默认为batch),容器将在侦听器返回后为您提交偏移量。

还要考虑至少升级到1.3.3(当前版本为2.1.4);1.3.x引入了一个简单得多的线程模型,这要归功于KIP-62

使用自动提交,无论成功/失败,都将提交偏移量。容器在失败后不会提交,除非ackonerror为true(不使用自动提交的另一个原因)。

但是,这仍然没有帮助,因为代理不会再次发送相同的记录。为此,您必须对使用者执行查找操作。

在2.0.1(当前版本为2.1.4)中,我们添加了SeekToCurrEnterrorHandler,它将导致在下一次轮询时重新发送失败和未处理的记录。参见参考手册。

对于旧版本(>=1.1),您必须使用ConsumerSeekAware侦听器,这要复杂得多。

另一种选择是添加重试,以便根据重试设置重新尝试传递。

 类似资料:
  • 我是卡珊德拉的新成员。 我在两台 Debian VMware 机器上创建了 2 个 cassandra 2.1 节点。在 asp.net mvc 中,我使用了 datastax 驱动程序 2.1.5,实际上没有任何问题,但是当我关闭或禁用其中一个节点上的网络时,应用程序似乎有 5 或 10 秒的延迟来自动连接其他节点。 当两个节点启动时,查询在c00:00:00.0620413秒内运行,当一个节点

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我正在尝试用6台机器实现一个Redis集群。我有一个由六台机器组成的流浪集群: 运行redis服务器 我编辑了上述所有服务器的/etc/redis/redis.conf文件,添加了这个 然后我在六台机器中的一台上运行了这个程序; Redis集群已启动并运行。我通过在一台机器上设置值手动检查它显示在其他机器上。 我的问题是,当我关闭或停止任何一台主机上的redis server时,整个集群都会停止运

  • 我有一个关于Kafka自动提交机制的问题。我正在使用启用自动提交的Spring-Kafka。作为一个实验,我在系统空闲(主题中没有新消息,没有正在处理的消息)的情况下,将我的消费者与Kafka的连接断开了30秒。重新连接后,我收到了如下几条消息: 第一,我不明白有什么好犯的?系统空闲(所有以前的消息都已提交)。第二,断开时间为30秒,比max.poll.interval.ms的5分钟(300000

  • 问题内容: 在简单情况下,如果3台服务器具有1个主服务器和2个从属服务器而没有分片。是否有使用Java和Jedis的经过验证的解决方案,该解决方案没有单点故障,并且将自动处理单个服务器(无论是主服务器还是从服务器)(自动故障转移)。例如,提升主机并在故障后重置,而不会丢失任何数据。 在我看来,这似乎应该是一个已解决的问题,但是我找不到关于它的任何代码,而仅是对实现此方法的高级描述。 谁实际覆盖并在

  • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }