我有一个 Kafka 应用程序,我一直在使用它 kafka-console-consumer.sh 使用消息,如下所示:
$./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic
它提供了我通过Kafka消费者给Kafka经纪人写的所有消息,没有任何遗漏。
最近,我将该应用程序部署在另一个环境中,因为某些原因,zookeperhost无法访问。所以我使用的是kafka简单的消费者外壳。sh,如下所示:
$./kafka-simple-consumer-shell.sh --broker-list brokerhost:9092 --topic myTopic --partition 0 --max-messages 1
但是有了这个,我看到很少的消息(大约5000个中有2-4个)会被遗漏。有人能解释一下kafka-simple-consumer-shell.sh是如何阅读信息的吗?
我怀疑可能某些消息会发送到某个不同的分区,因为我只是从分区 0 读取,所以我不是每次都收到所有消息。但是我不知道如何检查有多少个分区?其他分区的 ID 是什么?我尝试了 1,但它不起作用。
有人能帮忙吗?
kafka-simple-consumer.sh
只是创建一个从一个分区读取消息的使用者。因此,您的命令只是从 brokerhost:9092
读取 myTopic 分区 0
中的一条消息。如果分区 1 不在同一代理中,则它将无法像您所做的那样工作。(有关详细信息,请查看来自 GitHub 的代码)
如果您可以访问Zookeeper主机,那么只需使用
bin/kafka-topics.sh --describe --zookeeper zookeeperhost:2181 --topic myTopic
但是如果你不能访问动物园管理员的主机,我能想到的有两种方法。
broker-list
提供多个代理,格式为 broker1:port2,broker2:port2,broker3:port3。
然后,您可以计算出整个集群中存在多少个分区,但您仍然不知道哪个代理具有哪些分区。tmp/kafka-logs
(如果您使用的是默认日志目录)。你会发现像myTopic-0,myTopic-1
,...
采用主题分区#
的格式。您可以使用此功能手动检查哪个代理具有哪些分区。我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我
我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生
我是一个新的Kafka和使用Apache kafka消费者读取消息从生产者。但当我停下来开始一段时间。之间产生的所有消息都将丢失。如何处理这种情况。我正在使用这些属性“auto.offset.reset”、“latest”和“enable.auto.commit”、“false”。 这是我正在使用的代码。任何帮助都是感激的。
我正在尝试用Java实现一个简单的生产者-->Kafka-->消费者应用程序。我能够成功地生成和使用消息,但是当我重新启动消费者时,问题就出现了,其中一些已经使用的消息再次被消费者从Kafka中拾取(不是所有的消息,而是最近使用的一些消息)。 我已在我的使用者中设置了,并且我的属性设置为1000毫秒。 “重新传递一些已使用的消息”是一个已知的问题,还是有任何其他设置,我没有在这里? 基本上,有没有
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我需要想办法向Kafka要一份题目清单。我知道可以使用目录中包含的脚本来实现。一旦我有了这个列表,我需要每个主题的所有消费者。我在该目录中找不到脚本,在库中也找不到允许我这样做的类。 这背后的原因是,我需要弄清楚话题的偏移和消费者的偏移之间的区别。 有没有办法做到这一点?还是需要在每个消费者中实现此功能?