当前位置: 首页 > 知识库问答 >
问题:

SpringKafka-如何重置偏移量到最新与一个组ID?

孟英光
2023-03-14
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

我想开始最新的抵消,而不是为旧的价值所困扰。是否有可能重置该组的偏移量?

共有1个答案

袁赞
2023-03-14

因为我没有看到任何这样的例子,我要解释一下我是怎么做的。

您的@KafKalistener的类必须实现ConsumerSeekAware类,这将允许侦听器在分区被归属时控制偏移量搜索。(来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek)

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

这里,在重新平衡时,我们使用给定的回调来为所有给定的主题寻求最后的偏移量。感谢Artem Bilan(https://stackoverflow.com/users/2756547/artem-bilan)指导我找到答案。

 类似资料:
  • 在我的侦听器中,在使用消息后,如果发生任何异常,我将抛出一个异常。如果它成功了,那么我承认。但是,即使抛出异常,偏移量也不会后退。i、 e重试没有按预期进行。错误事件不会再次出现。 此外,我发现我没有消费所有预期的消息。我做错什么了吗? 监听器类 我正在做

  • 我使用的是spring data 1.5.8和Kafka的手动确认模式。只有一个简单的ErrorHandler接口,handle方法有参数exception和consumerRecord,但是如果json无法反序列化时抛出异常,我如何将偏移量设置为next?我需要省略这条信息,否则消费者会被卡在那里 spring data 2.0引入ConsumerRawareErrorHandler,唯一的方法

  • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }

  • 有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?

  • 我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这

  • 问题描述: 我们的Kafka consumer(在Spring Boot2.x中开发)正在执行几天。当我们重新启动这些消费者时,主题的所有消息都将被再次消费,但仅在特定条件下。 条件: 代理配置: 谢谢和问候