给定主题名、分区号和偏移量,我如何从该主题中读取一条记录?
在基于Sprng引导的应用程序中,我使用Kafka导入业务数据。导入记录被发送到导入队列,并被一个或多个业务模块使用。即使消费者未能从记录中导入数据以继续从以下记录中导入数据,记录也会始终得到确认。
稍后,用户(在他/她修复了一些相关的业务数据后)可以决定重新发送一个或多个失败(但已确认)的导入记录。
每个记录的偏移量、分区号和主题名都存储在我的应用程序内部的SQL数据库中。
从参考文档和一些StackOverflow问题中,我发现我必须:
这是阅读Kafka主题中的一条旧记录的唯一方法吗?或者有更简单的解决方案吗?
正如@Gary所建议的:
ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
Map<String, Object> configs = Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "incubator_retry",
"max.poll.records", 1);
DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
if (consumerRecords.isEmpty()) {
throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
topicPartition.topic(), topicPartition.partition(), offset));
}
return consumerRecords.iterator().next();
}
}
有一个更简单的解决方案。
Default消费者工厂
创建Kafka消费者
(或简单地创建一个)group.id
max.poll.records
属性设置为1consumer.assign(...)
所需的主题/分区查找(...)
到所需的偏移量投票(...)
直到您获得记录关闭()
消费者如果要使用任何消息转换(除了Kafka反序列化程序),则必须手动调用转换器。
有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?
我阅读了Kafka的所有文档,我读到的唯一方法是git和指定 但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的 但是得到一个例外 无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需
我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p
问题内容: 嘿,我正在尝试打开文件,仅从偏移量读取一定长度!我阅读了以下主题: 如何使用Java中的文件中的特定行号读取特定行? 在那儿,它说在不读取之前就不可能读取某行,但是我想知道字节! 是否可以从已知偏移量读取某些字节? 问题答案: RandomAccessFile提供一个功能:
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法