当前位置: 首页 > 知识库问答 >
问题:

如何确定主题已被Kafka Stream应用程序从第一个偏移量到最后一个偏移量从Java应用程序完全阅读

诸俊才
2023-03-14

我在Kafka Streams中需要一些帮助。我已经启动了一个Kafka流应用程序,它从第一个偏移量流式传输一个主题。主题的数据非常庞大,所以我想在我的应用程序中实现一种机制,使用Kafka流,这样我就可以在主题被完全读取到最后一个偏移量时得到通知。

我已经阅读了Kafka Streams 2.8.0 api,我找到了一个api方法i-e allLocalStore分区延迟,它将存储名称映射返回到另一个分区映射,其中包含每个分区的所有滞后信息。此方法返回此Streams本地的所有存储分区(活动或备用)的滞后信息。在上面的情况下,当我有一个节点运行该流应用程序时,此方法对我非常有用。

但是在我的例子中,系统是分布式的,应用程序节点是3个,主题分区是10个,这意味着每个节点至少有3个分区供主题读取。

我在这里需要帮助。我如何实现此功能,当主题从分区 0 完全读取到分区 9 时,我可以收到通知。请注意,到目前为止,我无法在此处使用数据库。

实现目标的其他方法也是受欢迎的。谢谢你。

共有1个答案

洪育
2023-03-14

我能够从管理员客户端 API 获得滞后信息。下面的代码结果针对给定流应用程序 i-e applicationId 读取的主题结束每个分区的偏移量和当前偏移量。

AdminClient adminClient = AdminClient.create(kafkaProperties);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(applicationId);

// Current offsets.
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();

// all topic partitions.
Set<TopicPartition> topicPartitions = topicPartitionOffsetAndMetadataMap.keySet();
// list of end offsets for each partitions.
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(topicPartitions.stream()
    .collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())));
    
 类似资料:
  • 可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗? 我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。

  • 我阅读了Kafka的所有文档,我读到的唯一方法是git和指定 但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的 但是得到一个例外 无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需

  • 问题内容: 我想要类似通用,可重用的方法,该方法将告诉我从流的起点读取的字节数。理想情况下,我希望它可以与所有InputStream一起使用,这样当我从不同的来源获得它们时,就不必包装它们中的每一个。 这样的野兽存在吗?如果不是,那么有人可以推荐现有的计数实现吗? 问题答案: 看一下Commons IO包中的CountingInputStream。它们也很好地收集了其他有用的InputStream

  • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人

  • 我想开始最新的抵消,而不是为旧的价值所困扰。是否有可能重置该组的偏移量?

  • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?