当前位置: 首页 > 知识库问答 >
问题:

从开始阅读主题

刘海
2023-03-14

我想从话题的一开始就开始消费。我已经将属性“AUTO\u OFFSET\u RESET\u CONFIG”设置为最早,但不知何故它仍然没有从一开始就读取。

如果我错过了什么,有什么想法吗?我每次都在创造一个新的消费群体。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);

    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getConsumerConfigs(false));
}

private Map<String, Object> getConsumerConfigs(boolean isEmbedded) {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, isEmbedded ? embeddedBootstrapServers : bootstrapServers);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId + "temp");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

       props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMillis);
       props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMillis);

       return props;
}

共有1个答案

段晨
2023-03-14

该属性仅适用于从未消费过的新消费群体。使用ConsumerSekAware消息侦听器,您可以为每个分配的主题/分区调用seektobegining

 类似资料:
  • 我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。

  • 因此,我有一个扫描仪,它可以使用while(file.hasNext())读取多行的文本文件,但是在它到达文本文件的末尾之后,我该如何制作它,以便在单独的while循环中重新开始读取行?

  • 我阅读了Kafka的所有文档,我读到的唯一方法是git和指定 但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的 但是得到一个例外 无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需

  • 如何确保我总是从Kafka主题的一开始就与Flink一起消费? Kafka0.9。x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka,而是Flink来控制偏移量: Flink在内部快照偏移量,作为其分布式检查点的一部分。Kafka/动物园管理员promise的补偿只是为了让外界对进展的看法与Flink对进展的看法保持同步。通过这种方式,监控和其他工作可以了解Flink K

  • 我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-