我们所有的30个主题都是用kafka中的10个分区创建的。我们正在按分区监控所有主题/group p-id的滞后。
我们正在使用Fluentd插件从kafka读取和路由日志。该插件是使用高级消费者实现的。我们为插件的单个主题配置了一些消费者,为多个主题配置了一些消费者。总的来说,除了3个主题之外,数据正在流经,没有问题。
问题是,对于正在处理的30个主题中的3个,我们发现分区滞后值不一致,即查看特定主题/组id的滞后值,某些分区的滞后值比其他分区高很多,有时高达30k。然而,对于其他27个主题,所有分区的滞后数保持一致,一个主题/组id的所有分区保持在彼此接近的范围内(例如,所有分区都在12和18之间)。
几乎每次我们重新启动Fluentd代理(它重新启动高级消费者)时,我们都会看到这三个主题的滞后开始平滑,有时它们会保持一致一段时间,然后滞后数字开始变得之字形。这只发生在3个主题上。但是当我们检查这三个主题的分布时,一切看起来都很正常。
我们对此感到茫然。高级使用者不编写用于管理从分区检索数据的代码。处理该部分的 kafka lib 是。使用者代码指定的所有内容都是线程数。我们已经尝试了 10、5,在所有情况下(尤其是 10 和 5 线程),这 3 个主题的滞后不一致不断出现。对于这些主题中的每一个,数据量都小于每小时 30k。
关于原因有什么建议吗?对此可以做什么?
非常感谢您的帮助。
根据所提供的细节,我将从以下几点开始,我想你应该已经看过了。
请务必让我知道,如果你发现任何更多的东西或解决一些微调问题
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答
我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢
apache kafka文档提到以下内容: 如果所有使用者实例具有相同的使用者组,那么记录将有效地在使用者实例上进行负载平衡。 如果所有的使用者实例都有不同的使用者组,那么每个记录都将广播给所有的使用者进程。
我想检查手动分配给特定主题的消费者组的滞后,这可能吗。我使用的是Kafka-0.10.0.1。我用的是shKafka跑步课。shKafka。管理ConsumerGroupCommand-new consumer-description-bootstrap server localhost:9092-group test但它说不存在组,所以我想知道当我们手动分配分区时,是否可以检查使用者的延迟。
我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,