当前位置: 首页 > 知识库问答 >
问题:

如何处理Kafka Consumer中的错误

顾烨磊
2023-03-14

我有以下Kafka配置类:

@Configuration
@AllArgsConstructor(access = AccessLevel.PROTECTED)

public class KafkaConfiguration {
private final KafkaConfigurationProperties kafkaConfigurationProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
    factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
    factory.setStatefulRetry(true);
    factory.setErrorHandler(new SeekToCurrentErrorHandler((record, exception) -> {
        if (exception instanceof SomeCustomException) {
            // here I want to mannually Acknowledge the consuming of the record
        }
    }, 10));

    ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setAckOnError(false);
    containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);
    return factory;
}

@Bean
@Qualifier(KAFKA_LOAN_REPAYMENT_PRODUCER)
public Producer<String, RepaymentEvent> loanRepaymentProducer() {
    return new KafkaProducer<>(producerConfiguration());
}

@Bean
@Qualifier(KAFKA_DEBT_COLLECTOR_PRODUCER)
public Producer<String, RepaymentEvent> debtCollectorProducer() {
    return new KafkaProducer<>(producerConfiguration());
}

private Map<String, Object> consumerConfiguration() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerGroupId());
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerAutoOffsetReset());
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerMaxPollRecords());
    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.TRUE);
    properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
    return properties;
}

private Map<String, Object> producerConfiguration() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
    return properties;
}
}

以及以下KafkaListener:

@Slf4j
@Component
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class DebtCollectorIncomingClient {

private final RepaymentTransferProcessService repaymentTransferProcessService;

@KafkaListener(
        topics = "${kafka.debtCollectorIncomingTopic}",
        groupId = "${kafka.debtCollectorConsumerAutoOffsetReset}",
        containerFactory = "debtCollectorConsumerContainerFactory")
public void submitMoneyTransferCommand(@Payload RepaymentEvent repaymentEvent) {
    log.info("Receiving command: {}", repaymentEvent);
    if (repaymentEvent.getPayload() instanceof RepaymentRequestTransfer) {
        RepaymentTransfer repaymentTransfer = aRepaymentTransfer(repaymentEvent);
        repaymentTransferProcessService.startRepaymentTransferProcess(repaymentTransfer);
    }
}

private RepaymentTransfer aRepaymentTransfer(RepaymentEvent repaymentEvent) {
    RepaymentRequestTransfer repaymentRequestTransfer = (RepaymentRequestTransfer) repaymentEvent.getPayload();
    return RepaymentTransfer.builder()
            .clientId(repaymentRequestTransfer.getClientId())
            .contractId(repaymentRequestTransfer.getContractId())
            .amount(BigDecimal.valueOf(repaymentRequestTransfer.getAmount()))
            .currency(Currency.getInstance(repaymentRequestTransfer.getCurrency().name()))
            .debtCollectorExternalId(repaymentEvent.getCorrelationId())
            .debtType(repaymentRequestTransfer.getDebtType())
            .build();
}
}

我想使用<code>SeekToCurrentErrorHandler</code>进行错误处理,我想使用类似于这里的特定功能,但目前我正在使用<code>springBootVersion=2.0.4。你能帮我设置依赖项和配置以处理Kafka消费者中的错误吗?

问候!

共有2个答案

潘俊
2023-03-14

经过几天阅读Gary在其他帖子上的回答,我终于找到了解决我问题的方法。也许这个问题不是很描述性,但是这个答案描述了我想要的行为。

在@Configuration我正在创建下面的春豆:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
        factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
        factory.setErrorHandler(new BlockingSeekToCurrentErrorHandler());

        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setAckOnError(false);
        containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);

        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(kafkaConfigurationProperties.getDebtCollectorConsumerRetryAttempts()));
    return retryTemplate;
}

和<code>BlockingSeekToCurrentErrorHandler</code>类:

public class BlockingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

    private static final int MAX_RETRY_ATTEMPTS = Integer.MAX_VALUE;

    BlockingSeekToCurrentErrorHandler() {
        super(MAX_RETRY_ATTEMPTS);
    }

    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        try {
            if (!records.isEmpty()) {
                log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
                MetricFactory.handleDebtCollectorIncomingBlockingError(records.get(0), exception);
                super.handle(exception, records, consumer, container);
            }
        } catch (SerializationException e) {
            log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
            MetricFactory.handleDebtCollectorIncomingDeserializationError(records, e);
        }
    }
}
范峰
2023-03-14

< code > seektocurrenterrathandler 从版本2.0.1开始就可用了。版本2.2中增加了额外的功能(重试一定次数后恢复)。

使用Spring Boot 2.1.4和Spring for Apache Kafka 2.2.6(Boot 2.1.5即将推出)。

 类似资料:
  • 问题内容: 作为节点程序员。我习惯于使用“ nodebacks”来处理代码中的错误: 编写该函数时,我可以执行以下操作: 我如何用promises处理这种错误? 问题答案: 经验法则 每当您对如何使用Promise有所疑问时,请考虑一下同步版本。 至少对我来说,这比第一个参数有时是的回调要干净得多。 promises方式几乎总是与问题的同步版本非常相似: 使用回调函数时,myFn看起来像什么: 使

  • 本文向大家介绍如何处理HTML5 Web Worker中的错误?,包括了如何处理HTML5 Web Worker中的错误?的使用技巧和注意事项,需要的朋友参考一下 下面显示了Web Worker JavaScript文件中错误处理功能的示例,该功能将错误记录到控制台。 示例

  • 由于有很多方法,如onErrorReturn、onErrorResume等,所以哪一个是正确的方法,以处理mono和flux的Reactive Spring webflux中的错误?

  • 问题内容: 我试图弄清楚如何使用boto3进行正确的错误处理。 我正在尝试创建一个IAM用户: 成功调用create_user后,我得到一个整洁的对象,其中包含API调用的http状态代码和新创建的用户的数据。 例: 这很好。但是,如果失败(例如用户已经存在),我只会得到一个类型为botocore.exceptions.ClientError的对象,其中只有文本可以告诉我出了什么问题。 示例:Cl

  • 问题内容: 如何处理此调用上的etimedout错误? 有没有办法等待更长的时间?还是再次请求远程文件? 究竟是什么会导致此错误?仅超时? 问题答案: 这是由于在给定时间内未收到您的请求响应(通过 请求模块选项)引起的。 基本上首先要捕获该错误,您需要在上注册一个处理程序,因此不会再抛出未处理的错误:。这里还有一些解释。 在处理程序中,您可以检查错误是否为ETIMEDOUT并应用自己的逻辑:。 如

  • 问题内容: 假设我有以下jQuery AJAX调用: 现在,当进行此AJAX调用时,我希望服务器端代码通过一些错误处理检查来运行(例如,用户是否仍在登录,他们是否有权使用此调用,数据是否有效,等等)。如果检测到错误,如何将错误消息冒泡回客户端? 我最初的想法是返回一个带有两个字段的JSON对象:error和errorMessage。然后将在jQuery AJAX调用中检查以下字段: 这对我来说有点