什么是最好的方法来实现死信队列(DLQ)的概念在Spring Boot 2.0应用程序中使用sping-kafka 2.1. x有所有的消息被处理失败的@KafkaListener方法的一些bean发送到一些预定义的Kafka DLQ主题并且不丢失一条信息?
因此,Kafka的记录是:
我尝试使用ErrorHandler的自定义实现创建侦听器容器,使用KafkaTemplate将无法处理的记录发送到DLQ主题。使用禁用的自动提交和记录确认模式。
spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${dlqTopic}")
private String dlqTopic;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
log.error("Error, sending to DLQ...");
kafkaTemplate.send(dlqTopic, record.key(), record.value());
}
}
这个实现似乎不能保证第3项。如果在DlqErrorHandler中引发异常,侦听器将不会再次使用记录。
事务侦听器容器的使用会有所帮助吗?
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
使用Spring Kafka实现DLQ概念有什么方便的方法吗?
更新2018/03/28
多亏了Gary Russell的回答,我通过实现DlqErrorHandler实现了预期的行为,如下所示
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
...
@Override
public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
Consumerrecord<?, ? record = records.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key, record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
} catch (Exception e) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
throw new KafkaException("Seek to current after exception", thrownException);
}
}
}
这样,如果消费者调查返回3条记录(1、2、3),第二条记录无法处理:
如果发送到DLQ失败,消费者寻求记录。offset()和记录将被重新发送到侦听器(发送到DLQ可能会失效)。
更新2021/04/30
自Spring以来,Kafka2.7.0无阻塞重试和死信主题在本机上得到支持。
参见示例:https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt
重试通常应该是非阻塞的(在单独的主题中完成)并延迟:
请参阅seekToCurInterrorHandler
。
当出现异常时,它会寻找消费者,以便在下一次投票时重新提交所有未处理的记录。
如果DLQ写入失败,您可以使用相同的技术(例如子类)写入DLQ,并查找当前偏移量(和其他未处理的偏移量),如果DLQ写入成功,则只查找剩余记录。
编辑
DeadLetterPublishingRecoverer
是在这个答案发布几个月后添加的。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-信件
我有一个lambda函数,我想为它创建一个SQS死信队列。我首先在Terraform中创建SQS: 这是来自Terraform的例子。但是,我被redrive_policy卡住了。 我是否正确理解,这为SQS队列设置了一个死信队列? 如果我设置了redrive_policy,这意味着我在一个DLQ上设置了一个DLQ。我觉得可以在DLQ上设置DLQ,在DLQ上设置DLQ,以此类推。 我找不到这方面的
死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理
对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在
未创建我的exchange和dlq。我在下面的YML中有以下内容。我确实创建了一个匿名队列,但也没有发布消息。任何想法。
我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我不能丢失任何信息。 null 我是这些框架的新手,我希望你能帮助配置我的...
我正在尝试设计一个基于SQS、Lambda和SNS的小型消息处理系统。在失败的情况下,我希望将消息排入死信队列(DLQ)中,并调用webhook。 目前,如果一切顺利,流程应该是这样的: SQS(用于处理重试)将消息排入队列 lambda由SQS调用并处理消息 lambda发送webhook,并正常完成 如果lambda中的某些东西出错(无法调用success webhook,无法处理手头的任务)