当前位置: 首页 > 知识库问答 >
问题:

使用简单使用者在Apache Kafka中读取未处理的消息

陈欣荣
2023-03-14

https://cwiki.apache.org/confluence/display/kafka/0.8.0+SimpleConsumer+example

使用SimpleConsumer来使用消息,但是在使用它时,我发现了一些突然的行为,如下所示:

使用者正在使用来自特定分区的消息。但问题是,当我的使用者运行并且我使用生产者将消息推送到主题时,它将使用来自该分区的消息。但是,如果我的消费者目前没有运行,我将一些消息推送到主题,然后再次启动消费者,它不会消费生产者推送的消息,但它再次准备消费现在将要推送的消息。我使用LatestTime()代替EarliestTime(),因为我只想使用未处理的消息。

案例-1

使用者正在运行:

制作者将M1,M2,M3消息推送到主题1的分区1

生产者现在将m4,m5,m6 messgae推到主题1的分区1

现在调用consumer

结果:消费者不消费messgaes m4、m5、m6,但如果我要检查偏移量,那么它被设置为7。这意味着生产者在产生消息时已经将偏移量提前到7,结果消费者现在将使用偏移量为7的消息

请帮助理想情况下,当消费者再次出现时,它应该读取来自M4的消息。

共有1个答案

左翰海
2023-03-14

你做错了。

首先,我不确定SimpleConsumer是您要找的东西。它迫使您自己管理偏移量(例如,它根本不向Zookeeper提交偏移量,并且每当您再次启动SimpleConsumer时,它将再次获取相同的消息)。SimpleConsumer不了解“已处理的消息”。它所能做的就是从某个偏移量开始获取,然后继续获取,直到你说“停止”。

无论如何,如果您打算自己提交处理过的偏移量,您应该使用earleSttime(auto.offset.reset=smallest配置项)。auto.offset.reset意味着如果使用错误的偏移量初始化消费者(如果我没记错的话,SimpleConsumer是使用-1偏移量初始化的,这显然是错误的),它将重置为可用的最小(earlesttime)或argest(latesttime)偏移量。

为了更清楚地说明,下面是一个示例:

您的案例-1:

创建一个使用者并将其指向主题1分区1。因为它最初是用错误的偏移量初始化的,所以它会要求代理提供一些适当的偏移量(这里是最小最大偏移量重置)。如果您还没有生成任何消息,最小最大的偏移量都为0,因此当您生成一些消息时,您的使用者将获取这些消息。

案例2:

您将生成N条消息(例如7)。然后启动SimpleConsumer。同样,它用错误的偏移量初始化,并向代理请求适当的偏移量。对于最小重置偏移量,它将为0,对于最大偏移量,它将为7。在您的示例中,您使用largestoffsets,您的使用者将使用offse7重新初始化并开始使用它。

一般来说,看看高级消费者,在大多数情况下,这是你要找的。这是链接

 类似资料:
  • 我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队

  • 问题内容: 我有一个简单的Java生产者,如下所示 我正在尝试读取以下数据 但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容 然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置 然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。 有人可以让我知道出了什么问

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 因此,根据我对Apache Kafka中事务的理解,read_committed消费者不会返回作为正在进行的事务一部分的消息。因此,我猜想,消费者可以选择将其偏移量提交给那些正在进行的事务消息(例如,读取非事务消息),或者可以选择在提交/中止遇到的事务之前不进一步推进。我只是假设(Kafka)允许跳过那些挂起的交易记录,但考虑到它的抵消可能已经很远了,那么消费者在提交时将如何读取它们呢? 更新 考

  • 我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的 重复的consumer_offset 没有帮助,因为它没有引用我的问题,即在Jav

  • 我已经研究阿帕奇Kafka一段时间了。 让我们考虑下面的例子。 考虑到我有3个分区的主题。我只有一个生产者和一个消费者。我在生成消息时没有指定key属性。 所以我知道在生产者方面,当我发布一条消息时,kafka使用的策略是将消息分配给这两个分区中的任何一个。 现在,我想知道的是,当我开始一个属于某个消费者群体的消费者听同一主题时,它将使用什么策略来从不同的参与者(因为有3个)中提取信息? 它是否会