当前位置: 首页 > 知识库问答 >
问题:

Spring kafka-带有AckMode记录和自定义SeekToCurrent错误处理程序的消息丢失

郭弘方
2023-03-14

侦听器在循环中接收消息,等待瞬态问题得到解决,如预期的那样,但在某些情况下消息丢失。

我使用的是spring kafka 2.2.5,请参阅有关kafka的spring boot配置

spring:
  kafka:
    bootstrap-servers: <HOST>:<PORT>
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      group-id: ConsumerGroup
      properties:
        max.poll.records: 1
    listener:
      ack-mode: record
      concurrency: 1

据我所知,应用程序在以下情况下会丢失消息:

  • 消息侦听器错误处理程序未启动/处理的异常会传播到KafCamessageListenerContainer。容器属性ackOnError的默认值为true,因此容器提交偏移量。如果重新启动侦听器,则消费者组中的任何其他客户端都不会再处理消息,因为偏移量已经提交并向前移动
  • 不会将异常/错误传播到容器,但侦听器会在前面描述的瞬时错误处理循环的中间重新启动。与前一种情况一样,由于容器在第一次轮询期间提交了偏移量,消费者组中的客户端不会重新捕获消息

因此,为了解决第一个问题,将ackOnError设置为false就足够了。但对于第二个问题,我开始怀疑AckMode记录在这种情况下是否正确,因为消息在每种情况下都已提交。

在这种情况下,我是应该立即使用手动_,还是忽略了要点?

非常感谢,亲切问候

共有1个答案

师承弼
2023-03-14

ackOnError自2.3版起默认为false;它从2.4开始就被弃用(支持GenericErrorHandler.isAckAfterHandle()),并在master(未来的2.7)上被删除。

由于偏移量已在第一次轮询期间由容器提交。

我不确定你的意思。如果您引用的是重新平衡侦听器中的逻辑,如果重新启动侦听器,您将获得一个新的消费者,因此它的位置()将是最后一个提交的偏移量。

从版本2.3.6开始,您可以通过将赋值promise选项设置为NEVER来完全禁用该逻辑。

2.2.x是生命的终结(与Boot 2.1一起),将不会有更多的版本。最后的2.2。x版本是2.2.14。2.2.5岁近2岁;你应该尽量多了解最新情况。

最新版本是2.6.3。

 类似资料:
  • 问题内容: 如何使用 自定义* 错误处理程序处理 解析 和 致命 错误? * 问题答案: 简单答案:不能。参见手册: 用户定义的函数无法处理以下错误类型:E_ERROR,E_PARSE,E_CORE_ERROR,E_CORE_WARNING,E_COMPILE_ERROR,E_COMPILE_WARNING,以及在调用set_error_handler()的文件中引发的大多数E_STRICT。 对

  • 我们使用高图表在单个超文本标记语言页面上绘制多个图表。 然而,一个/一些图表抛出highchart错误,我们喜欢捕捉这些错误并向用户显示不同的错误。为此,highcharts确实提供了自定义错误处理程序。但这个自定义错误处理程序不提供有关抛出该错误的特定图表的信息。 这里是highcharts提供的JS Fiddle,它适用于图表: http://jsfiddle.net/gh/get/libra

  • 安全测试人员声称,我应该清理返回的JSON(即转义这些符号),因为这可能会给旧的浏览器带来一些问题(即在浏览器中执行此JS代码)。 但是生成错误消息的是SpringBoot框架, 我在这里没有太多的控制权。 当然,我可以将参数定义为String,并自己进行验证,但我怀疑这是否是正确的方法。我的参数定义为Integer,我希望它保持这种方式。 做这件事最简单的方法是什么?

  • 所有的错误最终都会被 Tango.ErrHandler 进行处理。 你可以自定义你的错误处理方式来替代默认的。例如: var ( prefix = "<html><head>tango</head><body><div>" suffix = fmt.Sprintf("</div><div>version: %s</div></body></html>", tango.Version

  • 404和500错误客户端和服务端都会通过error.js组件处理。如果你想改写它,则新建_error.js在文件夹中: import React from 'react' export default class Error extends React.Component { static getInitialProps({ res, err }) { const statusCod

  • 我目前正在使用Spring Kafka来使用topic中的消息以及Spring的@Retry。因此,基本上,我正在尝试处理消费者消息以防出现错误。但在这样做时,我希望避免KafkaMessageListenerContainer抛出的异常消息。相反,我想显示一条自定义消息。我尝试在ConcurrentKafkAlisterContainerFactory中添加错误处理程序,但这样做时,我的重试没有