当前位置: 首页 > 知识库问答 >
问题:

库伯内特的Kafka流:重新部署后的长期再平衡

席兴平
2023-03-14

我们使用StatefulSet在Kubernetes上部署Scala Kafka Streams应用程序。实例具有单独的applicationids,因此它们每个都复制完整的输入主题,以实现容错。它们本质上是只读服务,只读取一个状态主题并将其写入状态存储,客户请求通过REST从那里得到服务。这意味着,在任何给定时间,消费者组总是仅由单个Kafka Streams实例组成。

现在我们的问题是,当触发滚动重启时,每个实例大约需要5分钟才能启动,其中大部分时间花费在rebalancing状态中等待。我在这里读到过,Kafka Streams不会发送LeaveGroup请求,以便在容器重新启动后快速返回,而不会重新平衡。为什么这对我们不起作用,为什么即使applicationid是相同的,重新平衡也需要这么长的时间?理想情况下,为了最大限度地减少停机时间,应用程序应该从重新启动时离开的地方立即接管。

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), "300000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// RocksDB config, see https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, classOf[BoundedMemoryRocksDBConfig])    
  • 减少会话.timeout.ms会有帮助吗?我们将其设置为相当大的值,因为Kafka代理居住在不同的数据中心,网络连接有时不是超级可靠的。
  • 此答案建议减少max.poll.interval.ms,因为它与重新平衡超时有关。那是正确的吗?我在犹豫是否要改变这个,因为它可能会对我们应用程序的正常操作模式产生影响。
  • 提到了一个config组.initial.rebalance.delay.ms来延迟部署期间的重新平衡-但这也会在从崩溃中恢复后导致延迟,是吗?
  • 我还偶然发现了KIP-345,它的目标是完全通过group.instance.id来消除静态成员身份的消费者再平衡,这将非常适合我们的用户案例,但它似乎还没有在我们的代理上提供。

我对众多的配置以及如何使用它们来实现更新后的快速恢复感到困惑。有人能解释一下他们是怎么一起玩的吗?

共有1个答案

鲜于浩淼
2023-03-14

你提到的另一个问题并不是说在重启时避免了再平衡。不发送leavegrouprequest只会避免停止应用程序时的重新平衡。因此,再平衡的次数从两次减少到一次。当然,对于您的有点不同寻常的单实例部署,您在这里没有任何收获(事实上,它实际上可能“伤害”您……)

减少session.timeout.ms会有帮助吗?我们将其设置为相当大的值,因为Kafka经纪人生活在不同的数据中心,网络连接有时不是超级可靠的。

可能是,这取决于你重启应用程序的速度。(详细信息请参阅下文。)也许只是尝试一下(例如,设置为3分钟,仍然有一个高值的稳定性,并看到它的再平衡时间下降到3分钟?

max.poll.interval.ms也会影响再平衡时间(更多详细信息见下文)。但是,默认值为30秒,因此不应导致5分钟的重新平衡时间。

这里提到了配置组p.initial.rebalance.delay.ms来延迟部署期间的重新平衡--但这也会在从崩溃中恢复后导致延迟,不是吗?

这只适用于空消费者组,默认值仅为3秒。所以应该不会影响你。

我还偶然发现了KIP-345,它的目标是完全通过group.instance.id来消除静态成员身份的消费者再平衡,这将非常适合我们的用户案例,但它似乎还没有在我们的代理上提供。

使用静态组成员身份实际上可能是最好的选择。也许值得升级您的代理以获得此功能。

顺便说一句,session.timeout.msmax.poll.interval.ms之间的差异在另一个问题中解释:对于Kafka 0.10.0.0和更高版本,session.timeout.ms和max.poll.interval.ms之间的差异

通常,代理端组协调器维护每个“组生成”的所有成员的列表。如果成员主动离开组(通过发送leaveGroupRequest)、超时(通过session.timeout.msmax.poll.interval.ms)或新成员加入组,则会触发重新平衡。如果发生了重新平衡,每个成员都有机会重新加入小组,以纳入下一代。

对于您的情况,小组只有一个成员。停止应用程序时,不会发送LeaveGroupRequest,因此组协调器将仅在session.timeout.ms通过后删除此成员。

如果你重新启动应用程序,它会作为一个“新”成员回来(从组协调者的角度来看)。这将触发一个重调,给组中的所有成员一个重新加入组的改变。对于您的情况,“旧的”实例可能仍在组中,因此只有在组协调器将旧成员从组中移除后,再平衡才会向前移动。问题可能是,小组协调员认为小组从一个成员扩大到两个成员。(这就是我上面的意思:如果将发送leavegrouprequest,那么当您停止应用程序时,组将变为空的,并且在重新启动时,只有新成员在组中,并且重新平衡将立即向前移动。)

使用静态组成员身份可以避免这个问题,因为在重新启动时,实例可以被重新标识为“旧的”实例,并且组协调器不需要等待旧的组成员过期。

 类似资料:
  • 我确实部署了单吊舱,自定义docker映像如下: 在开发过程中,我希望推送新的最新版本并更新部署。如果不明确定义标记/版本并为每个构建增加它,就找不到如何做到这一点,并且

  • 我正在尝试在Kubernetes集群(Azure AKS)中部署Flink作业。作业群集在启动后立即中止,但任务管理器运行正常。 docker镜像创建成功,没有任何异常。我可以运行docker镜像,也可以SSHdocker镜像。 我已经按照以下链接中提到的步骤: https://github.com/apache/flink/tree/release-1.9/flink-container/kub

  • 我有以下代码: 我创建了一个包含上述Python代码的映像的部署。 当我使用my Python代码不会创建sig文件指示,也不会打印“完成”消息。 点击此链接:https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace我看到k8s发送SIG

  • 我有一个Kafka Streams应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题。 每小时消耗/产生几百万条记录。每当我关闭一个代理时,应用程序就进入重新平衡状态,在重新平衡多次之后,它开始使用非常旧的消息。 注意:当Kafka Streams应用程序运行良好时,它的消费者滞后几乎为0。但再平衡之后,它的滞后从0到1000万。 这会不会是因为偏移.保留.分钟。 在这方面的任何帮助都将

  • 我最近开始研究Kubernetes集群。在我们的集群中,对给定Kubernetes服务的网络调用流如下所示: 外部非K8S负载均衡器- 对于给定的服务,有两个副本。通过查看副本中容器的日志,我可以看到调用被路由到不同的pod。据我所知,我们还没有为Kubernetes中的服务明确设置任何负载平衡策略。 我有几个问题: 1)K8S是否有默认的负载平衡策略?我读过库贝-proxy和随机路由。它看起来绝

  • 什么是负载均衡器? 负载平衡改进了跨多个计算资源(如计算机、计算机群集、网络链路、中央处理器或磁盘驱动器)的工作负载分布 NodePort不是负载平衡器。(我知道一旦流量在集群内,kube proxy就会在pod之间进行负载平衡)我的意思是,最终用户点击http://NODEIP:30111(例如)访问应用程序的URL。即使POD之间的流量是负载平衡的,用户仍然会点击一个节点,即“节点”,它是K8