当前位置: 首页 > 知识库问答 >
问题:

如何使用spring kafka从给定的主题和分区以特定的偏移量重新发送(读取)旧kafka消息?

蓬运诚
2023-03-14

给定主题名、分区号和偏移量,我如何从该主题中读取一条记录?

在基于Sprng引导的应用程序中,我使用Kafka导入业务数据。导入记录被发送到导入队列,并被一个或多个业务模块使用。即使消费者未能从记录中导入数据以继续从以下记录中导入数据,记录也会始终得到确认。

稍后,用户(在他/她修复了一些相关的业务数据后)可以决定重新发送一个或多个失败(但已确认)的导入记录。

每个记录的偏移量、分区号和主题名都存储在我的应用程序内部的SQL数据库中。

从参考文档和一些StackOverflow问题中,我发现我必须:

  1. 设置容器(消费者/监听器)
  2. 倒带(查找)到所需偏移量
  3. 读一条记录
  4. 跳过读取剩余记录

这是阅读Kafka主题中的一条旧记录的唯一方法吗?或者有更简单的解决方案吗?

正如@Gary所建议的:

ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
    Map<String, Object> configs = Map.of(
            "bootstrap.servers", "localhost:9092",
            "group.id", "incubator_retry",
            "max.poll.records", 1);
    DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
            configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());

    try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        consumer.assign(List.of(topicPartition));
        consumer.seek(topicPartition, offset);
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
        if (consumerRecords.isEmpty()) {
            throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
                    topicPartition.topic(), topicPartition.partition(), offset));
        }
        return consumerRecords.iterator().next();
    }
}

共有1个答案

赵高雅
2023-03-14

有一个更简单的解决方案。

  • 使用Default消费者工厂创建Kafka消费者(或简单地创建一个)
  • 使用不同的group.id
  • max.poll.records属性设置为1
  • consumer.assign(...)所需的主题/分区
  • 查找(...)到所需的偏移量
  • 投票(...)直到您获得记录
  • 关闭()消费者

如果要使用任何消息转换(除了Kafka反序列化程序),则必须手动调用转换器。

 类似资料:
  • 有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?

  • 我阅读了Kafka的所有文档,我读到的唯一方法是git和指定 但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的 但是得到一个例外 无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需

  • 我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p

  • 问题内容: 嘿,我正在尝试打开文件,仅从偏移量读取一定长度!我阅读了以下主题: 如何使用Java中的文件中的特定行号读取特定行? 在那儿,它说在不读取之前就不可能读取某行,但是我想知道字节! 是否可以从已知偏移量读取某些字节? 问题答案: RandomAccessFile提供一个功能:

  • 问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终

  • 我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法