在我工作的公司,我们使用< code>Spring for Kafka而不进行身份验证,最近我们做了一些实验来设置Kafka中的安全性,我们启用了短暂的身份验证,这导致了我们微服务中所有消费者/生产者的崩溃!(微服务熬夜)
例外情况:
Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: foo-group
经过一些研究,我们发现这是kafka客户端的预期行为,我们需要设置< code > authorizationExceptionRetryInterval 属性
公共void setAuthorizationExceptionRetryInterval(java.time.Duration授权ExceptionRetryInterval)
设置KafkaConsumer引发AuthorizationException后的重试间隔。默认情况下,该字段为空,并且禁用重试。在这种情况下,容器将被停止。间隔必须小于max.poll.interval.ms使用者属性。
以下是一些其他有用的链接
设置授权异常重试春卡节
为什么SpringKafka消费者暂停n个主题的所有消费,当一个人未能授权时
我想知道的是:
只有在下列情况下才会停运集装箱:
授权例外
无授权例外重试间隔
无偏移分区异常
- 当ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
不是最早
或最新的
,并且此使用者组的分区没有现有偏移量时引发。受防护实例 Id例外
- 使用事务和静态组成员(意味着其他某个实例正在使用此实例 ID)。停止后Geofence例外
- 当停止当Geofence
为真时(默认为假) - 仅适用于交易错误
(如欧姆)我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
我在kafka消费者文档中看到了这个注释-
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?