当前位置: 首页 > 知识库问答 >
问题:

Kafka consumer offsetForTimes方法只返回很少的分区偏移量位置不是全部

董谦
2023-03-14

我有一个有8个分区的Kafka主题,从单个消费者订阅该主题,我有唯一的消费者组为消费者。现在,我尝试只使用来自所有分区的最近消息(在我的例子中,是从当前时间开始的3分钟前)。我使用了如下所示的offsetForTimes方法。

List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartions = partitionInfos.stream().......collect(Collectors.toList());
Long value = Instant.now().minus(120,ChronoUnit.SECONDS).toEpochMillis();
Map<TopicPartion,Long> topicPartitionTime = topicPartions.stream().collect(COllectors.toMap(tp -> tp,(value)));
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);

现在的问题是offsetsForTimes只返回一个或两个分区偏移量位置,并返回null表示剩余位置。

我想使用所有分区最近的消息,而不是一个或两个分区。

我在下面也试过了

consumer.unsubscribe();
consumer.assign(allPartitions);
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
    null

使用apache kafka Version-2.11-2.2.0kafka clients JAR-2.0.1

感激事先的帮助。

共有1个答案

高祺
2023-03-14

我不能再现你的情况;唯一一次为偏移量获得null是当该分区没有提交的偏移量时。例如,我有10个分区,但只写到8:

@SpringBootApplication
public class So59200574Application implements ConsumerSeekAware {

    public static void main(String[] args) {
        SpringApplication.run(So59200574Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so59200574").partitions(10).replicas(1).build();
    }

    @KafkaListener(id = "so59200574", topics = "so59200574")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ConsumerAwareRebalanceListener rebal() {
        return new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                final long tenSecondsAgo = System.currentTimeMillis() - 10_000L;
                partitions.forEach(tp -> timestampsToSearch.computeIfAbsent(tp, tp1 -> tenSecondsAgo));
                System.out.println(consumer.offsetsForTimes(timestampsToSearch));
            }

        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 8).forEach(i -> template.send("so59200574", i, null, "foo" + i));
    }

}
 类似资料:
  • oracle DB是否能够返回时区区域,例如欧洲/伦敦,而不仅仅是偏移?我想知道服务器所在的地区名称。 从DUAL中选择SYSTIMESTAMP参数;返回日期、时间和偏移量: 从dual中选择DBTIMEZONE;返回偏移量:

  • 本文向大家介绍如何以分钟为单位返回当前语言环境的时区偏移量?,包括了如何以分钟为单位返回当前语言环境的时区偏移量?的使用技巧和注意事项,需要的朋友参考一下 JavaScript date getTimezoneOffset()方法以分钟为单位返回当前语言环境的时区偏移量。时区偏移量是分钟差。格林威治标准时间(GMT)相对于当地时间。 示例 您可以尝试运行以下代码以分钟为单位返回时间一偏移量-

  • 我有一个由5个分区组成的主题如下: 似乎分区的偏移量非常接近其余分区的偏移量之和。我不知道如何以及为什么。

  • 问题内容: 我有一个遗留应用程序,我将需要补充一些数据。当前,我们有一个数据库表,用于存储美国(及其地区)邮政编码,GMT偏移量以及一个标志,用于显示该邮政编码是否使用夏时制。这是从某个免费提供商处下载的,我现在找不到源。 现在,我需要用每个邮政编码的完整奥尔森名称(例如)来补充此表,因为这似乎是将存储在数据库中的给定日期/时间(对于该购买者而言本地的)转换为对象的唯一好方法。 看一下桌子: 在另

  • 将kafka consumer offset重置为“最早”时,它会保留一些带有偏移量的分区 显示: 为什么分区1也没有0?

  • 在我的应用程序中,有一个计算得到两个日期之间的天数。计算如下所示; 此方法在我的本地pc(windows 7,64位)上返回正确的值(即2015-03-10 00:00:00.0、2015-03-02 00:00:00.0,从存储库中检索的值为8,并传递给该方法)。但是,此方法在位于加拿大的pc(Windows 7,64位)上返回不正确的值(即2015-03-10 00:00:00.0、2015-