当前位置: 首页 > 知识库问答 >
问题:

在没有提交操作的情况下使用Java Kafka Consumer是否正确?

衡子安
2023-03-14

我需要从起始偏移量到结束偏移量读取一组记录。我使用专用Kafka消费品。我至少同意一次语义(在这种情况下,如果给定的应用程序实例宕机,新的应用程序实例从该起始偏移量重新读取记录)。

那么,我可以使用这样的代码吗?

private static KafkaConsumer<Long, String> createConsumer() {

    final Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    return new KafkaConsumer<>(props);
}

public void process() {

    KafkaConsumer consumer = createConsumer();
    TopicPartition topicPartition = new TopicPartition("topic", 2);
    consumer.assign(List.of(topicPartition));

    long startOffset = 42;
    long endOffset = 100;

    consumer.seek(topicPartition, startOffset);

    boolean isRunning = true;
    while (isRunning) {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);

        for (ConsumerRecord<Long, String> record : consumerRecords) {
            if (record.offset() >= endOffset) {
                isRunning = false;
                break;
            }
        }
    }

    consumer.close();
}

因此:

  • 我没有commit()

是正确的代码吗?或者它有一些隐藏的问题?

共有1个答案

慕容超
2023-03-14

是的,这是正确的用法,你不应该遇到任何问题。这不是Kafka消费者的典型用法,但这是允许的。

从官方的Kafka消费者javadoc(我的亮点):

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

控制消费者的地位

在大多数用例中,使用者只需从头到尾消费记录,定期提交其位置(自动或手动)。但是,Kafka允许使用者手动控制其位置,在分区中随意向前或向后移动。这意味着使用者可以重新使用较旧的记录,或跳到最新的记录,而无需实际使用中间记录。在一些情况下,手动控制消费者的位置可能很有用。

...

Kafka允许使用seek(TopicPartition,long)指定位置来指定新位置。还提供了查找服务器维护的最早和最新偏移量的特殊方法(分别请参见“集合”和“集合”)。

 类似资料:
  • 是否可以在没有实体的情况下使用JpaRepository?在这种情况下,将其替换为DTO。 如下示例所示 这种情况有替代方案吗? 注意:DTO已经映射,但我不想创建视图来将此DTO转换为实体。 我已经验证了这个主题,但没有重大进展,请使用无实体的JpaRepository交互样式 我在试这个 接口- 公共接口BffDTOInterface2{ } 我有这个错误

  • 我想使用并使其直接进入给定的url,而不是从ribbon配置中获取主机。 我知道在Spring,cloud-feign默认与ribbon和eureka一起出现。 根据这个:https://cloud.spring.io/spring-cloud-netflix/multi/multi_spring-cloud-ribbon.html#spring-cloud-ribbon-without-eure

  • 目前,我正在研究一个考勤应用程序的可行性,该应用程序具有为Android和iOS编写的以下功能。iOS-不能自动化。每次点击标签时,它都需要用户输入或单击。 然而,我没有很多关于Android NFC功能的谷歌搜索结果。我想在我的考勤应用程序中使用以下功能。 > 将员工ID写入标记或在员工记录中记录标记标识符(使用第三方应用程序) 考勤应用程序将安装和设置在Android手机上,并保存在塑料盒(考

  • 我看到的所有解决方案都需要使用。但是,我想在Eclipse之外的单个文件上使用CDT解析器。那有什么办法吗?

  • 在Spring Boot的文档中,我只找到了使用Redis会话的例子,不使用Redis也能使用它吗?