当前位置: 首页 > 知识库问答 >
问题:

Kafka的死信队列(DLQ)

许鸿志
2023-03-14

什么是最好的方法来实现死信队列(DLQ)的概念在Spring Boot 2.0应用程序中使用sping-kafka 2.1. x有所有的消息被处理失败的@KafkaListener方法的一些bean发送到一些预定义的Kafka DLQ主题并且不丢失一条信息?

因此,Kafka的记录是:

  1. 已成功处理,
  2. 处理失败,并发送到DLQ主题,
  3. 处理失败,未发送到DLQ主题(由于意外问题),因此将再次被侦听器使用

我尝试使用ErrorHandler的自定义实现创建侦听器容器,使用KafkaTemplate将无法处理的记录发送到DLQ主题。使用禁用的自动提交和记录确认模式。

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

这个实现似乎不能保证第3项。如果在DlqErrorHandler中引发异常,侦听器将不会再次使用记录。

事务侦听器容器的使用会有所帮助吗?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

使用Spring Kafka实现DLQ概念有什么方便的方法吗?

更新2018/03/28

多亏了Gary Russell的回答,我通过实现DlqErrorHandler实现了预期的行为,如下所示

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

这样,如果消费者调查返回3条记录(1、2、3),第二条记录无法处理:

  • 1将被处理
  • 2将处理失败并发送到DLQ
  • 3感谢消费者寻求record.offset()1,它将被传递给听众

如果发送到DLQ失败,消费者寻求记录。offset()和记录将被重新发送到侦听器(发送到DLQ可能会失效)。

更新2021/04/30

自Spring以来,Kafka2.7.0无阻塞重试和死信主题在本机上得到支持。

参见示例:https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt

重试通常应该是非阻塞的(在单独的主题中完成)并延迟:

  • 不干扰实时通信

共有1个答案

危飞文
2023-03-14

请参阅seekToCurInterrorHandler

当出现异常时,它会寻找消费者,以便在下一次投票时重新提交所有未处理的记录。

如果DLQ写入失败,您可以使用相同的技术(例如子类)写入DLQ,并查找当前偏移量(和其他未处理的偏移量),如果DLQ写入成功,则只查找剩余记录。

编辑

DeadLetterPublishingRecoverer是在这个答案发布几个月后添加的。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-信件

 类似资料:
  • 我有一个lambda函数,我想为它创建一个SQS死信队列。我首先在Terraform中创建SQS: 这是来自Terraform的例子。但是,我被redrive_policy卡住了。 我是否正确理解,这为SQS队列设置了一个死信队列? 如果我设置了redrive_policy,这意味着我在一个DLQ上设置了一个DLQ。我觉得可以在DLQ上设置DLQ,在DLQ上设置DLQ,以此类推。 我找不到这方面的

  • 死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理

  • 对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71​ 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在

  • 未创建我的exchange和dlq。我在下面的YML中有以下内容。我确实创建了一个匿名队列,但也没有发布消息。任何想法。

  • 我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我不能丢失任何信息。 null 我是这些框架的新手,我希望你能帮助配置我的...

  • 我正在尝试设计一个基于SQS、Lambda和SNS的小型消息处理系统。在失败的情况下,我希望将消息排入死信队列(DLQ)中,并调用webhook。 目前,如果一切顺利,流程应该是这样的: SQS(用于处理重试)将消息排入队列 lambda由SQS调用并处理消息 lambda发送webhook,并正常完成 如果lambda中的某些东西出错(无法调用success webhook,无法处理手头的任务)