当前位置: 首页 > 知识库问答 >
问题:

SpringKafka消费者重试,回退间隔长,给出“组织.apache.kafka.clients.消费者.Commit失败异常”

陆啸
2023-03-14

我是Spring-Kafka的新手,在使用Spring Kafka RetryTemplate处理kafka消息期间,尝试在失败或任何异常的情况下实现重试。

我使用了以下代码:

//这是KafkaListenerContainerFactory:

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(retryContext -> {
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        logger.info("Recovery is called for message {} ", consumerRecord.value());
        return Optional.empty();
    });
    return factory;
}

重试模板

public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    // Todo: take from config
    fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    // Todo: take from config
    simpleRetryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(simpleRetryPolicy);
    return retryTemplate;
}
    
//

这是消费者工厂

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

当任何异常发生时,它会按照重试策略按预期重试。一旦max重试耗尽,它就会调用恢复回调方法。但很快,它会给出“java.lang.IllegalStateException:此错误处理程序无法处理org.apache.kafka.clients.consumer.CommitFailedException;没有可用的记录信息”,其中包含一些细节,例如:失败的OffsetCommit请求,因为消费者不是活动组的一部分。

它似乎无法提交偏移量,因为消费者现在从组中启动,因为它在下一次轮询之前长时间处于空闲状态(回退期*(maxretry-1))。

我需要添加一些大价值的max.poll.interval.ms吗?

有没有其他方法可以实现这一点,这样即使消费者在处理过程中花费了很多时间并计划以长时间间隔重试,也不会出现提交失败错误。

请帮帮我。

共有1个答案

卢才艺
2023-03-14

聚合backOff延迟必须小于max.poll.interval。ms以避免重新平衡。

现在最好使用<code>SeekToCurrentErrorHandler</code>而不是<code>RetryTemplate</code>,因为只有每个延迟(而不是聚合)需要小于<code>max.poll.interval.ms</code>

此处为文档。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。