Getting records from stream --> dump it into a table --> Fetch records and call API --> API will update records into a table --> calling Async Commit()
在某些情况下,API处理花费了更多的时间,因为获取了更多的记录,我们得到了以下错误?
成员consumer-prov-em-1-399ede46-9e12-4388-b5b8-f198a4e6a5bc向协调器apslt2555.uhc.com:9095(ID:2147483577 rack:null)发送离开组请求已过期。这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。
CommitFailedException:无法完成提交,因为组已经重新平衡并将分区分配给了另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。
我是否还必须增加max.poll.interval.ms。大概10分钟吧。在改变这些值时,我是否应该注意到任何向下的影响?除了这些参数,还有没有其他方法来处理这些错误呢?
max.poll.records
允许批处理消耗模型,在此模型中,记录在刷新到另一个系统之前先在内存中收集。其思想是通过从kafka轮询获得所有记录,然后在轮询循环中在内存中处理这些记录。
如果您减少这个数字,那么消费者将更频繁地从Kafka进行轮询。这意味着它需要更频繁地进行网络调用。这可能会降低kafka流处理的性能。
max.poll.interval.ms
控制使用者主动离开组之前轮询调用之间的最长时间。如果这个数字增加,那么Kafaka将需要更长的时间来检测消费者的失败。另一方面,如果这个值太低,Kafaka可能会错误地发现许多活着的消费者失败了,从而更经常地进行再平衡。
我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢
我有两个Kafka监听器组件,每个组件监听不同的主题并期待不同的有效负载。我的问题是,我可以对两者使用相同的客户端id吗?还是必须使用不同的客户端id?如果客户端id必须不同,我想了解一个可以有效使用客户端id的用例。
我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #
对于我的测试,我在队列中发布了700万条消息。我创建了一个包含30个消费者线程消费者组,每个分区一个。我最初的印象是,与通过SQS获得的相比,这将大大加快处理能力。不幸的是,情况并非如此。在我的例子中,数据处理是复杂的,平均需要1-2分钟才能完成,这导致了一系列分区重新平衡,因为线程不能按时运行。我在日志里看到一堆消息 组FULL_GROUP的自动偏移量提交失败:无法完成提交,因为该组已重新平衡并
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?