例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。
是的,kafka没有提供这个功能,但是您可以在您的消费者代码中实现这个功能。您可以尝试使用commitsync()
来控制这一点。
公共无效commitSync(映射偏移)
为指定的主题和分区列表提交指定的偏移量。这就抵消了Kafka。使用该API提交的偏移量将在每次重新平衡后的第一次提取中使用,也将在启动时使用。因此,如果您需要在Kafka以外的任何地方存储偏移量,则不应使用此API。提交的偏移量应该是应用程序将要使用的下一个消息,即lastProcessedMessageOffset+1。
while (goAhead) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
if (record.offset() > OFFSET_BOUND) {
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
goAhead = false;
break;
}
process(record);
}
}
有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。
我正在使用Kafka 0.9新的消费者API。 我让Kafka为消费者负责补偿。我让消费者在多台机器上阅读相同的主题。 我试图找出以下内容: 在消费者组中注册的消费者 每个消费者的偏移量 我以为消费者-群体-消费者关系会存储在ZooKeeper中。我在ZooKeeper中看到了消费者节点,它没有孩子。 通过查看代码,我可以知道的偏移量被写入了kafka,但我不知道它们被写入了哪个主题?
我相信这三种类型的确认由于生产者属性仅限于领导者和生产者,我希望生产者在消费者通过kafka broker消费来自存储/队列的消息时收到具体的消息。还请纠正我,如果我在制作人的“acks”属性上有错误,它的默认值是“-1”,它确认所有副本是否已接收/存储消息,但它是否与消费者有关,或者我们是否可以在消费者提交且Kafka向制作人发送确认时创建一个桥梁?
我正在尝试使用Kafka-Python编写一个消费者,以确保精确的once语义。分区中的消息是使用事务感知生成器生成的。我从Kafka文档中了解到,我应该将指定为,这样它将只读取提交的消息。问题是,我在Python客户机的文档中没有看到关于如何指定的任何地方。关于如何让我的消费者只阅读提交的消息,有什么想法吗? 预期结果:只需获取transation committed消息实际结果:使用者甚至读取
现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是 在不使用消费者的情况下启动API 关于如何手动管理消费过程的任何建议? 技术细节: 用于创建侦听器
这是一个场景:我知道,使用与Spring kafka相关的最新API(如Spring集成kafka 2.10),我们可以执行以下操作: 以及来自与相同kafka主题相关的不同分区的读取。 我想知道我们是否可以使用同样的方法,例如spsping-集成-Kafka1.3.1 我没有找到任何关于如何做到这一点的提示(我对xml版本很感兴趣)。