当前位置: 首页 > 知识库问答 >
问题:

在Apache Kafka中读取消息偏移量

赫连靖琪
2023-03-14

我对Kafka非常陌生,我们正在使用Kafka0.8.1

我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。

我认为我需要使用<code>简单消费者

在本例中,偏移量在run方法中计算为“readOffset”。我需要玩吗?例如,我可以使用<code>LatestTime()之前的偏移量。

我应该这样吗?

共有1个答案

堵雅健
2023-03-14

我认为你可以使用高级消费者(http://kafka.apache.org/documentation.html#highlevelconsumerapi),它应该比简单消费者更容易使用。我认为消费者不需要重读Kafka关于数据库故障的消息,因为消费者已经有了这些消息,可以将它们重新发送到数据库或做任何它认为合适的事情。

高级使用者将从特定分区读取的最后偏移量存储在Zookeeper中(基于使用者组名称),以便当使用者进程死亡并随后重新启动时(可能在其他主机上),它可以继续处理中断的消息。可以定期将此偏移量自动保存到Zookeeper(请参阅使用者属性auto.commit.enable和auto.commit.interval.ms),或者通过调用ConsumerConnector.commitOffsets由应用程序逻辑保存。另请参见https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例。

我建议您关闭自动提交,并在收到数据库确认后自己提交偏移量。因此,您可以确保在消费者失败的情况下从Kafka重读未处理的消息,并且提交给Kafka的所有消息最终至少会到达数据库一次(但不是“一次”)。

 类似资料:
  • 我创建了以批处理方式接收消息的ConsumerConfig: Spring启动配置: 侦听器类 : 我在处理消息后使用手动确认。 我找到了一些调试日志: 在上面的调试日志中,***获取偏移量发生在偏移量提交之前,该偏移量未提交,因此它返回offset_OUT_OF_RANGE,之后使用者无法接收任何消息。是否有任何方法处理使用者代码中的此错误,或如何仅在提交后获取偏移量****

  • 我已经开始让我的制作人向Kafka发送数据,也让我的消费者提取相同的数据。当我在ApacheNIFI中使用ConsumerKafka处理器(kafka版本1.0)时,我脑海中很少有与kafka consumer相关的查询。 Q.1)当我第一次启动ConsumeKafka处理器时,我如何从开始和当前消息中读取消息? 问题2)以及在Kafka消费者关闭的情况下,如何在最后一条消费信息之后阅读信息? 在

  • 我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2

  • 如有任何帮助,我们将不胜感激。

  • 我是流媒体代理(如Kafka)的新手,来自排队消息系统(如JMS、Rabbit MQ)。 我从Kafka文档中读到,消息作为记录存储在Kafka分区的偏移量中。消费者从偏移量读取。 消息和记录有什么区别[多个/部分消息是否构成记录?] 当消费者从偏移量读取时,消费者是否有可能读取部分消息?消费者是否需要基于某种逻辑将这些对等消息串起来? 或 1条消息=1条记录=1个偏移量 之所以会出现这个问题,是

  • 我正在探索ChronicleQueue来保存我的一个应用程序中生成的事件。我想在经过一些处理后,将保存的事件按其原始发生顺序发布到不同的系统。我有多个应用程序实例,每个实例都可以运行一个单线程appender来将事件添加到ChronicleQueue。尽管跨实例排序是必要的,但我想理解以下两个问题。 2)还需要一些建议,如果有一种方法来保持跨实例的事件顺序。