当前位置: 首页 > 知识库问答 >
问题:

在@KafkaListener-方法中确认,而不会“丢失”消息

田慈
2023-03-14

我们目前基本上通过以下简化机制确认消息:

  @KafkaListener(topics = "someTopic")
  public void listen(final String message, final Acknowledgment ack) {
    try {
        processMessage(message);
        ack.acknowledge();
    } catch (final IOException e) {
        // do not acknowledge here since we can temporarily not process the message
    } 

基本上,每当我们暂时不能处理消息时(在IOExceptions的情况下),我们希望在以后的时间再次接收它。

但这不起作用,因为acknowledge假设同一分区内以前的所有消息都已成功处理。在我们的IOException案例中,失败的消息将被跳过,但可能会被同一分区上具有更高索引的不同消息确认。

我们对如何解决这个问题有一些想法,但这意味着需要一些棘手的解决方法,以避免在KafkaListener方法中直接调用acknowledge。我们的用例是非常具体的,还是更像SpringKafka用户假设的“默认”行为?

对于这类问题有没有一个SpringKafka的解决方案?或者你有没有办法“正确”解决这个问题?

共有1个答案

习旻
2023-03-14

这就是Kafka的工作方式;您可以启用“重试”以尝试在继续下一条消息之前通过IOException。

您可以配置一个错误句柄,以便将失败消息发布到另一个主题,以便稍后重播。或者,错误处理程序可以停止容器以阻止任何新的交付。

重新启动容器时,将重播消息。

 类似资料:
  • 我试图以CSV格式保存py spark . SQL . data frame . data frame(也可以是其他格式,只要它易于阅读)。 到目前为止,我找到了几个示例来保存DataFrame。然而,每次我编写它时,它都会丢失信息。 数据集示例: 为了将这个文件保存为CSV,我首先尝试了这个解决方案: 不幸是,这导致了以下错误: 这就是我尝试另一种可能性的原因,将spark数据帧转换成panda

  • 主要内容:1.RocketMq架构,2.消息不丢失1.RocketMq架构 Producer,Consumer,Brocker,Name Server 2.消息不丢失 1.Producer发送消息 2.Brocker保存消息 3.Consumer 消费消息 4.Brocker主从切换 2.1 同步发送 同步发送会返回状态码 1.SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave

  • 这篇文章,给不太熟悉MQ技术的同学,介绍一个生产环境中可能会遇到的问题。 目前为止,你的RabbitMQ部署在线上服务器了,对吧?然后订单服务和仓储服务都可以基于RabbitMQ来收发消息,同时仓储服务宕机,不会导致消息丢失。 好,我们来看下目前为止的架构图。 那如果此时出现一个问题,就是说订单服务投递了订单消息到RabbitMQ里去,RabbitMQ暂时放在了自己的内存中,还没来得及投递给下游的

  • 问题内容: 我正在使用SQL数据库,我有一列名为“价格”。创建数据库后,将“价格”列设置为“我”,需要将其类型更改为不丢失数据库中的数据。这应该通过SQL脚本来完成 我想到了创建一个新列,将数据移到其中,删除旧列,然后重命名新创建的列。 有人可以帮我举个例子吗?在SQL中也有一个函数可以将字符串解析为十进制? 谢谢 问题答案: 您无需添加新列两次,只需在更新新列后删除旧列即可: 请注意,如果不是数

  • 我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码: 以及接收代码(在循环中运行): 问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据)。发送和接收的代码对于所有消息都是相同的。应用程序日志: 正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000

  • 问题内容: 我需要使用Python调整jpg图像的大小,而又不丢失原始图像的EXIF数据(有关拍摄日期,相机型号等的元数据)。所有有关python和图像的google搜索都指向我当前正在使用的PIL库,但似乎无法保留元数据。我到目前为止(使用PIL)的代码是这样的: 有任何想法吗?还是我可能正在使用的其他库? 问题答案: http://www.emilas.com/jpeg/