当前位置: 首页 > 知识库问答 >
问题:

如何防止主主题重新使用 DLQ 处理的消息?

龚跃
2023-03-14

我有一个 kafka 消费者类,它有一个主主题侦听器和一个 DLQ 侦听器。当主主题监听器无法处理消费者记录时,根据我的 bean 工厂,记录被推送到 DLQ 主题中。因此,DLQ 成功处理了该消息。但是,当我重新启动使用者应用程序时,我看到 DLQ 处理的消息再次被主主题侦听器使用,尽管它已成功处理。有人可以帮助我如何防止主要主题重新使用DLQ处理的消息吗?提前感谢您!

Kafka·Consumer.java

public class KafkaConsumer {
//MAIN TOPIC LISTENER
@KafkaListener(id = "main-topic", topics = "main-topic", groupId = "main", containerFactory = "kafkaListenerContainerFactory", clientIdPrefix = "main-topic")
    public void mainListener(ConsumerRecord<String, String> consumerRecord, Acknowledgement ack) {
     //The below line of code converts the consumerRecord value into an Object class and save the converted object to DB.   
     dbService.saveTodb(consumerRecord.value(), new ObjectMapper());
        ack.acknowledge();
    }

//DLQ LISTENER
@KafkaListener(id = "DLQ-topic", topics = "DLQ-topic", groupId = "main", clientIdPrefix = "DLQ", autostartup= "false")
    public void mainListener(ConsumerRecord<String, String> consumerRecord, Acknowledgement ack) {
        dbService.saveTodb(consumerRecord.value(), new ObjectMapper());
        ack.acknowledge();
    }
}

KafkaBeanFactory.java

public class KafkaBeanFactory{
@Bean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        var recoverer = new DeadLetterPublishingRecoverer(template,
                (record, ex) -> new TopicPartition("DLQ-topic", record.partition()));
        var errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(3, 20000));
        errorHandler.addRetryableExceptions(JsonProcessingException.class, DBException.class);
        errorHandler.setAckAfterHandle(true);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
}
}

应用程序.yaml

 kafka:
    bootstrap-servers: localhost:9092 # sample value 
    client-id: main&DLQ
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<string>";
          security:
            protocol: SASL_SSL
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      groupId: main
      enable-auto-commit: false
      auto.offset.reset: earliest
    listener:
      ack-mode: MANUAL_IMMEDIATE

共有1个答案

谢正初
2023-03-14

您需要在默认错误处理程序上设置< code>commitRecovered属性。

    /**
     * Set to true to commit the offset for a recovered record.
     * The container must be configured with
     * {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
     * Whether or not the commit is sync or async depends on the container's syncCommits
     * property.
     * @param commitRecovered true to commit.
     */
    @Override
    public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
 类似资料:
  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。 下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码: 对于金融主题也是如此:

  • 假设我有2个Kafka主题登录和注销按用户名分区,并具有相等数量的分区。如果我运行一个由两个消费者组成的消费者组,同时使用两个主题,我是否可以确定每个用户的登录和注销事件将由同一个消费者处理?

  • 我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 我正在构建一个由亚马逊服务提供支持的警报系统。 我每天将一个文件放到S3上,它生成一个lambda函数(我们称之为生成器函数)来处理该文件。 Generator基于此文件构建警报并将多条消息发布到SNS主题(让我们称之为发件箱)-由Generator计算的每个收件人一条消息。 我在发件箱中订阅了第二个lambda函数(我们称之为Courier),它应该接收每条消息并对其进行处理。 发电机代码: 以