我试图了解如何跟踪Kafka的信息摄取。
我们现在遵循的工作流程是清除主题中的所有消息,然后我们用代码更改重新摄取。我需要知道那些代码更改有多成功。在当前状态下,我正在使用Kafka工具,手动刷新消息总数,并将结果保存在csv中,我知道这是不可持续的长期。
你对自动获取Kafka主题中的消息计数有什么建议?理想情况下,我想击中的主题一分钟一分钟的频率,并得到计数,以及窗口的时间,如1天等。
消息计数也取决于日志压缩。
例如,当日志压缩对一个主题有效时,您可能会观察到“奇怪的结果”。假设您有一个主题mytopic
,它总共有100条消息。假设您有一个现在生效的日志压缩策略,计数可能会减少到20条消息,因为旧的消息已经压缩。
为了获得每个分区的消息计数,可以使用以下命令:
kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic myTopic
myTopic:2:34
myTopic:1:33
myTopic:0:33
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:2181 \
--topic myTopic \
--time -1 \
--offsets 1 \
| awk -F ":" '{sum += $3} END {print sum}'
我使用的是0.9.0.0版本的Kafka,我想在不使用管理脚本Kafka-console-consumer.sh的情况下计算主题中的消息数。 我已经尝试了答案Java“How to get number of messages in a topic in apache kafka”中的所有命令,但都没有结果。有人能帮我吗?
我需要一个Kafka主题存储的消息数量。这与任何消费者是否消费了消息无关。 以上是否等于Kafka主题中当前存储的消息数?
我是使用Kafka的新手,我有一个问题。如果我知道主题、偏移量和分区,我可以只删除一条消息吗?如果没有,还有什么其他选择吗?
下面的日志每次都在后面。(同一条消息的错误几乎是30倍) 我试着重新启动Kafka和Spring Boot应用程序,仍然是同样的问题。 日志: 听者 > 有什么解决方案可以停止收听错误消息吗?我使用了seekErrorHandler它有时只工作,有任何配置问题吗? Kafka和Spring的Kafka有什么问题吗? 如何解决这一问题?
我正在测试Kafka主题的工作,但我不明白删除是如何工作的。 我创建了一个简单的主题 在此之后,我创建了一个生产者,并发送了一些消息。消费者没有问题地接收到消息。但我预计,一分钟后,如果重复了消费者,它不会显示消息,因为它们肯定已经被删除了。但这种行为不会发生。 如果我用ksql创建一个查询,那也是一样的。消息总是会出现。 我想我不明白删除是怎么回事。 3)制作人留言 4)消费者 消费者会显示这四
正如标题中所说的,我想在我的主题中获得一些记录,但我无法找到一个使用kafka-python库的解决方案。有人知道吗?