当前位置: 首页 > 知识库问答 >
问题:

Kafka简历消费者无法收到第一条消息

越风史
2023-03-14
@StreamListener(ChannelName.MESSAGE_INPUT_RETRY_CHANNEL)
public void onMessageRetryReceive(org.springframework.messaging.Message<Message> message, @Header(KafkaHeaders.CONSUMER)KafkaConsumer<?,?> consumer){

    long waitTime = //Calculate the wait time of the message
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
    if(waitTime > 0){
        consumer.pause(Collections.singleton(new TopicPartition("message-retry-topic",0)));
    }else{
        messageProducer.sendMessage(message.getPayload());
        acknowledgment.acknowledge();
    }
}


@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
    return event -> {
        boolean isReady = //Logi to check if ready to resume
        if(isReady){
            event.getConsumer().resume(event.getConsumer().paused());
        }
    };
}

共有1个答案

越骏俊
2023-03-14

不调用acnowledgment.acknowledge();并不意味着KafKaconsumer实例不保留内存中最后使用的位置。

我们肯定需要为分区上的后续使用者提交偏移量。当前运行的消费者不需要这样的信息来提交,因为它有自己的内存状态。

为了能够重新使用相同的记录,您需要执行seek()操作。有关更多信息,请参见文档:https://Docs.spring.io/spring-kafka/Docs/current/reference/html/#seek

 类似资料:
  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生

  • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

  • 我可以在命令行上针对Kafka位置安装发送和接收消息。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我还有一个Kafka消费者的Java代码。代码昨天收到了消息。但是今天早上没有收到任何消息。代码没有更改。我想知道属性配置是否不太正确。这是我的配置: 制片人: 生产记录设置为 消费者: 对于Java代码: 少了什么?

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: