当前位置: 首页 > 知识库问答 >
问题:

反序列化错误后如何提交偏移量?SpringKafka

谢俊悟
2023-03-14

我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。
由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量?
谢谢。

我正在使用的自定义错误处理程序:

 class KafkaErrorHandler implements ConsumerAwareErrorHandler{

private static final Logger LOG = LoggerFactory.getLogger(KafkaErrorHandler.class);

@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    if (!records.isEmpty()) {
        ConsumerRecord<?, ?> record = records.get(0);
        String topic = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        if (thrownException.getClass().equals(DeserializationException.class)) {
            DeserializationException exception = (DeserializationException) thrownException;
            String malformedMessage = new String(exception.getData());
            LOG.info("Skipping message with topic {} and offset {} " +
                    "- malformed message: {} , exception: {}", topic, offset, malformedMessage, exception.getLocalizedMessage());
        } else {
            LOG.info("Skipping message with topic {} - offset {} - partition {} - exception {}", topic, offset, partition, thrownException);
        }
    } else {
        LOG.info("Consumer exception - cause: {}", thrownException.getMessage());
    }
}

}

共有1个答案

太叔景同
2023-03-14

使用<代码>确认模式。手动_立即,请参阅

/**
 * Set to true to commit the offset for a recovered record.
 * The container must be configured with
 * {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
 * Whether or not the commit is sync or async depends on the container's syncCommits
 * property.
 * @param commitRecovered true to commit.
 */
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
    super.setCommitRecovered(commitRecovered);
}

DefaultErrorHandler上。

确认模式下不可能。手册;对于其他AckModes,默认情况下,容器将提交恢复记录的偏移量(基于isAckAfterHandle())。

编辑

提交偏移量的逻辑位于SeekUtils.seekOrRecover(在查找之后)。

if (commitRecovered) {
    if (container.getContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
        ConsumerRecord<?, ?> record = records.get(0);
        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
                new TopicPartition(record.topic(), record.partition()),
                ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
        if (container.getContainerProperties().isSyncCommits()) {
            consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
        }
        else {
            OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
            if (commitCallback == null) {
                commitCallback = LOGGING_COMMIT_CALLBACK;
            }
            consumer.commitAsync(offsetToCommit, commitCallback);
        }
    }
    else {
        logger.debug(() -> "'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE, not "
                + container.getContainerProperties().getAckMode());
    }
}
 类似资料:
  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我使用MANUAL_IMMEDIATEack模式,Spring-kafka 1.3.9(不能更改为Java8),并在监听器代码中完成处理时提交偏移量。我使用自定义反序列化器及其工作正常,除非我遇到反序列化异常。然后Kafka卡住了。我已经处理了这个由Deserializer,喜欢而不是抛出异常(当反序列化异常发生)我得到一个反序列化对象的新实例,并设置原始消息(导致反序列化异常)在一个字段(异常数

  • 我有一个spring kafka消费者,它读取记录并将其交给缓存。计划的任务将定期清除缓存中的记录。我只想在批成功保存到数据库中后更新提交偏移量。我尝试将Acknowledgement对象传递给缓存服务以调用acknowledge方法,如下所示。 确认模式设置如下: 自动提交是错误的: 即使调用了确认方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么? 我正在使用spring-