当前位置: 首页 > 知识库问答 >
问题:

Kafka简单消费者间歇性丢失消息

汝承载
2023-03-14

我有一个 Kafka 应用程序,我一直在使用它 kafka-console-consumer.sh 使用消息,如下所示:

$./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic

它提供了我通过Kafka消费者给Kafka经纪人写的所有消息,没有任何遗漏。

最近,我将该应用程序部署在另一个环境中,因为某些原因,zookeperhost无法访问。所以我使用的是kafka简单的消费者外壳。sh,如下所示:

$./kafka-simple-consumer-shell.sh --broker-list brokerhost:9092 --topic myTopic --partition 0 --max-messages 1

但是有了这个,我看到很少的消息(大约5000个中有2-4个)会被遗漏。有人能解释一下kafka-simple-consumer-shell.sh是如何阅读信息的吗?

我怀疑可能某些消息会发送到某个不同的分区,因为我只是从分区 0 读取,所以我不是每次都收到所有消息。但是我不知道如何检查有多少个分区?其他分区的 ID 是什么?我尝试了 1,但它不起作用。

有人能帮忙吗?

共有1个答案

晋坚
2023-03-14

kafka-simple-consumer.sh 只是创建一个从一个分区读取消息的使用者。因此,您的命令只是从 brokerhost:9092 读取 myTopic 分区 0 中的一条消息。如果分区 1 不在同一代理中,则它将无法像您所做的那样工作。(有关详细信息,请查看来自 GitHub 的代码)

如果您可以访问Zookeeper主机,那么只需使用

bin/kafka-topics.sh --describe --zookeeper zookeeperhost:2181 --topic myTopic

但是如果你不能访问动物园管理员的主机,我能想到的有两种方法

  1. 提供一个将所有代理作为参数的列表,并尝试从 0 到 N 的分区号。您可以向 --broker-list 提供多个代理,格式为 broker1:port2,broker2:port2,broker3:port3。然后,您可以计算出整个集群中存在多少个分区,但您仍然不知道哪个代理具有哪些分区。
  2. 手动检查每个代理的日志目录。检查 /tmp/kafka-logs(如果您使用的是默认日志目录)。你会发现像myTopic-0,myTopic-1...采用主题分区#的格式。您可以使用此功能手动检查哪个代理具有哪些分区。
 类似资料:
  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生

  • 我是一个新的Kafka和使用Apache kafka消费者读取消息从生产者。但当我停下来开始一段时间。之间产生的所有消息都将丢失。如何处理这种情况。我正在使用这些属性“auto.offset.reset”、“latest”和“enable.auto.commit”、“false”。 这是我正在使用的代码。任何帮助都是感激的。

  • 我正在尝试用Java实现一个简单的生产者-->Kafka-->消费者应用程序。我能够成功地生成和使用消息,但是当我重新启动消费者时,问题就出现了,其中一些已经使用的消息再次被消费者从Kafka中拾取(不是所有的消息,而是最近使用的一些消息)。 我已在我的使用者中设置了,并且我的属性设置为1000毫秒。 “重新传递一些已使用的消息”是一个已知的问题,还是有任何其他设置,我没有在这里? 基本上,有没有

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我需要想办法向Kafka要一份题目清单。我知道可以使用目录中包含的脚本来实现。一旦我有了这个列表,我需要每个主题的所有消费者。我在该目录中找不到脚本,在库中也找不到允许我这样做的类。 这背后的原因是,我需要弄清楚话题的偏移和消费者的偏移之间的区别。 有没有办法做到这一点?还是需要在每个消费者中实现此功能?