当前位置: 首页 > 知识库问答 >
问题:

在Kafka客户(消费者/生产者)倒下后恢复他们

苗盛
2023-03-14

在我工作的公司,我们使用< code>Spring for Kafka而不进行身份验证,最近我们做了一些实验来设置Kafka中的安全性,我们启用了短暂的身份验证,这导致了我们微服务中所有消费者/生产者的崩溃!(微服务熬夜)

例外情况:

Authorization Exception and no authorizationExceptionRetryInterval set

org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: foo-group

经过一些研究,我们发现这是kafka客户端的预期行为,我们需要设置< code > authorizationExceptionRetryInterval 属性

公共void setAuthorizationExceptionRetryInterval​(java.time.Duration授权ExceptionRetryInterval)

设置KafkaConsumer引发AuthorizationException后的重试间隔。默认情况下,该字段为空,并且禁用重试。在这种情况下,容器将被停止。间隔必须小于max.poll.interval.ms使用者属性。

以下是一些其他有用的链接

设置授权异常重试春卡节

为什么SpringKafka消费者暂停n个主题的所有消费,当一个人未能授权时

我想知道的是:

  1. 失败的身份验证是消费者/生产者出现故障的唯一情况吗?
  2. 如果还有其他一些情况,如何确保我们的消费者/生产者在没有人为干预(重新启动微服务)的情况下恢复?换句话说,如何检查消费者/生产者是否启动并重新启动它们?

共有1个答案

燕钟展
2023-03-14

只有在下列情况下才会停运集装箱:

  • 授权例外授权例外重试间隔
  • 无偏移分区异常 - 当ConsumerConfig.AUTO_OFFSET_RESET_CONFIG不是最早最新的,并且此使用者组的分区没有现有偏移量时引发。
  • 受防护实例 Id例外 - 使用事务和静态组成员(意味着其他某个实例正在使用此实例 ID)。
  • 停止后Geofence例外 - 当停止当Geofence为真时(默认为假) - 仅适用于交易
  • 任何错误(如欧姆)
 类似资料:
  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我在kafka消费者文档中看到了这个注释-

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?