当前位置: 首页 > 知识库问答 >
问题:

Kafka Spring使用者偏移量不使用ConsumerRecordRecoverer提交

唐星晖
2023-03-14

版本:

spring-boot : 2.2.2.RELEASE
spring-kafka : 2.3.7.RELEASE
kafka broker : 2.3.1 (via amazon MSK)
auto.offset.reset: earliest
enable.auto.commit: false
isolation.level: read_committed
    null

Kafkaconfiguration

@Configuration
public class KafkaConfig {


  @Bean
  ConsumerRetryConfig retryConfig() {
    return new ConsumerRetryConfig();
  }

  @Bean
  public RetryTemplate consumerRetryTemplate(ConsumerRetryConfig consumerRetryConfig) {
    RetryTemplate retryTemplate = new RetryTemplate();

    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(consumerRetryConfig.getRetryWaitInterval());
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(consumerRetryConfig.getMaxRetries());
    retryTemplate.setRetryPolicy(retryPolicy);

    return retryTemplate;
  }

  @Bean
  @Lazy
  FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig() {
    return new FiniteRequeueingRecovererConfig();
  }

  @Bean
  @Lazy
  FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer(
    KafkaTemplate<String, SpecificRecord> kafkaTemplate,
    FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig
  ) {
    return new FiniteRequeueingRecordRecoverer(kafkaTemplate, finiteRequeueingRecovererConfig.getMaxRequeues());
  }

  @Bean
  @Lazy
  DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor(
    FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer,
    ConsumerRetryConfig consumerRetryConfig
  ) {
    DefaultAfterRollbackProcessor ret = new DefaultAfterRollbackProcessor(
      finiteRequeueingRecordRecoverer,
      new FixedBackOff(
        consumerRetryConfig.getRetryWaitInterval(),
        consumerRetryConfig.getMaxRetries()
      )
    );
    ret.setCommitRecovered(true);
    return ret;
  }

  @Bean
  public ProducerFactory<String, SpecificRecord> avroMessageProducerFactory(KafkaProperties kafkaProperties) {
    Map<String, Object> props = MapBuilder.<String, Object>builder()
      .putAll(kafkaProperties.buildProducerProperties())
      .put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString())
      .build();

    return (kafkaAvroSerializer==null) ?
      new DefaultKafkaProducerFactory<>(props) :
      new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer);
  }

  @Bean
  public KafkaTemplate<String, SpecificRecord> avroMessageKafkaTemplate(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
    return new KafkaTemplate<>(avroMessageProducerFactory);
  }

  @Bean
  public KafkaTransactionManager<?,?> kafkaTransactionManager(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
    return new KafkaTransactionManager<>(avroMessageProducerFactory);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> finiteRequeueingKafkaListenerContainerFactory(
    ConsumerFactory<Object, Object> consumerFactory,
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    KafkaTransactionManager<Object, Object> kafkaTransactionManager,
    DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor
  ) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

    factory.setStatefulRetry(true);
    factory.setAfterRollbackProcessor(finiteRequeueingRollbackProcessor);

    return factory;
  }

  @KafkaListener(
    id = "${some.listener-id}",
    topics = "${some.topic}",
    groupId = "${some.group-id}",
    containerFactory = "finiteRequeueingKafkaListenerContainerFactory"
  )
  public void consume(
    @Payload WebhookNotificationMessage message,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
  ) throws Exception {

    // Do the thing, maybe throw an exception

  }

}

FiniteRequeueingRecordRecoverer

public class FiniteRequeueingRecordRecoverer implements ConsumerRecordRecoverer {
  private final Logger logger = LoggerLike.getLogger(FiniteRequeueingRecordRecoverer.class);

  private KafkaTemplate<String, SpecificRecord> kafkaTemplate;
  private Integer maxRequeues;

  public FiniteRequeueingRecordRecoverer(KafkaTemplate<String, SpecificRecord> kafkaTemplate, Integer maxRequeues) {
    this.kafkaTemplate = kafkaTemplate;
    this.maxRequeues = maxRequeues;
  }

  @Override
  public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {

    // Not sure the substance of this recoverer is relevant...but if so
    // If the retry number in the avro record is < this.maxRequeues
    //   then increment the retries and re enqueue this message, move on
    // If retries have been exhausted, do not requeue and send to a dead letter or just abandon
  }
}

共有1个答案

任伟
2023-03-14

defaultafterrollbackprocessor需要kafkatemplate将偏移量发送到新事务。

如果commitrecover为true并且没有KT,我们可能应该记录一个警告。

 类似资料:
  • 我正在使用spring with Kafka来消费来自Kafka主题的数据。我已经将并发配置为10。因此不同的线程轮询代理以获取消息并处理消息。即使在一段时间后(成功处理),我们也会收到相同的消息返回给使用者的不同线程。我们能够在配置的max.poll.interval.ms=1500000内处理接收到的消息。 请找到以下配置的Kafka消费者属性。我已经通过Kafka配置了自动提交。 你能帮我解

  • 我有一个版本1.1.0中的kafka控制台消费者,我用它从kafka获取消息。当我使用带有option-max-messages的kafka-console-consumer.sh脚本时,它似乎提交了错误的偏移量。 我创建了一个主题和一个消费者小组,并阅读了一些消息:

  • 在Kafka消费者中,自动偏移重置:最新和启用自动提交:false的主要目的是什么。 在Kafka consumer中,我是否可以在KafkaListner期间检查是否能够接收重复记录或处理以前的记录 在spring boot中创建消费者端时,需要记住的所有必要事项以及日志记录的所有重要事项是什么,以便日志记录具有一定的意义

  • 我用的是Kafka的高级消费者。因为我使用Kafka作为我的应用程序的“事务队列”,所以我需要绝对确保不会错过或重读任何消息。关于这一点,我有两个问题: > 如何将偏移量提交给zoomaster?我将在每条消息成功消费后关闭自动提交和提交偏移量。我似乎找不到如何使用高级消费者执行此操作的实际代码示例。有人能帮我吗? 另一方面,我听说promisezooeger可能会很慢,所以另一种方法可能是在本地

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。