当前位置: 首页 > 知识库问答 >
问题:

Kafka consumer在处理某些消息时失败

东方文林
2023-03-14

我有一个spring boot应用程序与单个Kafka消费者从一些主题获取消息。但有时在处理消息时会出现错误。

我理解我需要禁用自动提交并手动提交成功的消息,但是,在这种情况下,如果我没有为这个异常情况抛出任何异常,并手动提交每个下一个成功的消息,那么我将丢失前一个不成功的消息,对吗?

共有1个答案

邓建柏
2023-03-14

如果我正确理解您的问题,您的假设是,异常发生是由于您的代码中的一个问题,而不是在阅读主题中的消息时。在这种情况下,没有重试或其他措施将解决您的问题。

我们通常所做的是捕捉到异常,并将其发送到另一个Kafka主题。理想情况下,您还将添加一些详细信息,说明异常发生的原因或发生在哪个代码部分。修复了应用程序中的bug之后,您就可以使用来自其他主题的消息了。

我理解我需要禁用自动提交并手动提交成功的消息,但是,在这种情况下,如果我没有为这个异常情况抛出任何异常,并手动提交每个下一个成功的消息,那么我将丢失前一个不成功的html" target="_blank">消息,对吗?

如果您只希望在极少数情况下可以抛出异常,但您却忽略了它,那么您可以始终使用纯Kafka格式的consumer.seek()方法

public void seek(TopicPartition partition, long offset)

从主题分区的特定偏移量开始读取。

 类似资料:
  • 我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码: 以及接收代码(在循环中运行): 问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据)。发送和接收的代码对于所有消息都是相同的。应用程序日志: 正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000

  • 寻找设计我的Kafka消费者的最佳方法。基本上,我想看看什么是避免数据丢失的最佳方法,以防在处理消息期间出现任何异常/错误。 我的用例如下。 a)我使用SERVICE来处理消息的原因是 - 将来我计划编写一个ERROR处理器应用程序,该应用程序将在一天结束时运行,它将尝试再次处理失败的消息(不是所有消息,而是由于任何依赖项(如父级缺失)而失败的消息)。 b)我想确保没有消息丢失,所以我会将消息保存

  • 在FLTK中是通过Fl_Widegt::handle(),虚拟函数来处理系统的消息。我们可以查看Fltk的源代码来分析系统是怎样处理一些系统消息的,如按钮的消息处理 /******************************************************* Fl_Button中处理消息的代码,省略了具体的处理代码 *******************************

  • 全部的 这里有一条简单的路线: JsonValidator是一个简单的Javabean,我在其中扩展了处理器。在这里,我想确保在我继续使用Jackson散集调用以将JSON散集到我的POJO之前,所有必需的字段都被传入。 我现在在豆子里做的只是一行: 只需调用exchange.getIn().getBody(String.class),就会导致路由中的下一个(解组)步骤抛出一个错误,表示没有要解组

  • 使用kafka流处理器api 场景:流处理器(使用kafka流处理器api实现)从源主题读取数据,并基于某些业务逻辑将数据写入目标主题。

  • 应用程序有一个JMS队列负责交付审计日志。应用程序将日志发送到JMS队列,该队列由MDB使用。 但是,发送的消息是大 XML 文件,大小从 20 MB 到 100 MB 不等。问题在于 JMS 队列使用消息的时间太长,从而导致内存不足错误。 我应该怎么做才能解决这个问题?