当前位置: 首页 > 知识库问答 >
问题:

如何在 Spring Kafka 中提交偏移量,如果消息失败并由 AfterRollbackProcessor 处理

曾新
2023-03-14

我用的是SpringKafka2.2.9的Spring靴2.1.9

如果消息多次失败(在afterRollbackProcessor中定义),消费者将停止轮询记录。但如果消费者重新启动,它会再次轮询相同的消息和进程。

但是我不希望消息再次被重新轮询,最好的阻止方法是什么?

这是我的配置

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:300}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:60000}")
    private Integer sessionTimoutMs;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);

        AfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
                (record, exception) -> {
                    log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
                            + " value:" + record.value());
                    messageProducer.saveFailedMessage(record, exception);
                }, retryMaxAttempts);

        factory.setAfterRollbackProcessor(afterRollbackProcessor);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

我怎样才能做到这一点?

共有1个答案

钱欣悦
2023-03-14

promise恢复属性设置为true并注入一个Kafka模板,该工厂与事务管理器配置为相同的生产者工厂。

/**
 * {@inheritDoc}
 * Set to true and the container will run the
 * {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
 * if a record is skipped and recovered, we will send its offset to the transaction.
 * Requires a {@link KafkaTemplate}.
 * @param commitRecovered true to process in a transaction.
 * @since 2.2.5
 * @see #isProcessInTransaction()
 * @see #process(List, Consumer, Exception, boolean)
 * @see #setKafkaTemplate(KafkaTemplate)
 */
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
    super.setCommitRecovered(commitRecovered);
}

编辑

这是过程中的逻辑...

    if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
            getSkipPredicate((List) records, exception), this.logger)
                && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {

        // if we get here it means retries are exhausted and we've skipped

        ConsumerRecord<K, V> skipped = records.get(0);
        this.kafkaTemplate.sendOffsetsToTransaction(
                Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                        new OffsetAndMetadata(skipped.offset() + 1)));
    }

编辑2

在 2.2.x 中,该属性为

/**
 * Set to true to run the {@link #process(List, Consumer, Exception, boolean)}
 * method in a transaction. Requires a {@link KafkaTemplate}.
 * @param processInTransaction true to process in a transaction.
 * @since 2.2.5
 * @see #process(List, Consumer, Exception, boolean)
 * @see #setKafkaTemplate(KafkaTemplate)
 */
public void setProcessInTransaction(boolean processInTransaction) {
    this.processInTransaction = processInTransaction;
}
 类似资料:
  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }

  • 在这里,consumerTwo没有确认该消息。我想根据可配置的时间重试将所有消息再次发送给消费者。 任何例子或建议的建议将是很大的帮助。

  • 我有一个关于Kafka自动提交机制的问题。我正在使用启用自动提交的Spring-Kafka。作为一个实验,我在系统空闲(主题中没有新消息,没有正在处理的消息)的情况下,将我的消费者与Kafka的连接断开了30秒。重新连接后,我收到了如下几条消息: 第一,我不明白有什么好犯的?系统空闲(所有以前的消息都已提交)。第二,断开时间为30秒,比max.poll.interval.ms的5分钟(300000

  • 我正在使用spring with Kafka来消费来自Kafka主题的数据。我已经将并发配置为10。因此不同的线程轮询代理以获取消息并处理消息。即使在一段时间后(成功处理),我们也会收到相同的消息返回给使用者的不同线程。我们能够在配置的max.poll.interval.ms=1500000内处理接收到的消息。 请找到以下配置的Kafka消费者属性。我已经通过Kafka配置了自动提交。 你能帮我解

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。