当前位置: 首页 > 知识库问答 >
问题:

断开连接时停止消费者(Spring Boot Kafka)

施季
2023-03-14

我需要帮助消费者在Spring启动。当断开连接时,我需要停止应用程序,例如10分钟。当断开连接时

2021-08-16 18:39:21.927  WARN 15548 --- [ntainer#0-0-C-1] o.a.k.c.NetworkClient                    : [Consumer clientId=topicTest, groupId=goupTest-] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-08-16 18:39:24.942  WARN 15548 --- [ntainer#0-0-C-1] o.a.k.c.NetworkClient                    : [Consumer clientId=topicTest, groupId=goupTest] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-08-16 18:39:24.943  INFO 15548 --- [ntainer#0-0-C-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=topicTest, groupId=goupTest] Error sending fetch request (sessionId=1591961485, epoch=INITIAL) to node 1:

org.apache.kafka.common.errors.DisconnectException: null

或者当无法连接时

2021-08-16 18:42:15.960  WARN 23068 --- [ntainer#0-0-C-1] o.a.k.c.NetworkClient                    : [Consumer clientId=topicTest, groupId=goupTest] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-08-16 18:42:15.961  WARN 23068 --- [ntainer#0-0-C-1] o.a.k.c.NetworkClient                    : [Consumer clientId=topicTest, groupId=goupTest] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-08-16 18:42:19.125  WARN 23068 --- [ntainer#0-0-C-1] o.a.k.c.NetworkClient                    : [Consumer clientId=topicTest, groupId=goupTest] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-08-16 18:42:19.126  WARN 23068 --- [ntainer#0-0-C-1] o.a.k.c.NetworkClient                    : [Consumer clientId=topicTest, groupId=goupTest] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

我使用ConsumerFactory和ConcurrentKafkaListenerContainerFactory进行消费者的所有配置

共有1个答案

齐永昌
2023-03-14

Kafka客户端将自动重试以从断开连接中恢复,您应该会看到更多日志,您也可以将日志级别更改为“调试”以查看更多详细信息。

另请参阅此票据,了解未来有关此问题的更多更新:https://issues.apache.org/jira/browse/KAFKA-6520 .

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢

  • 我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。 我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常: 更重要的是,CachingConnectionFactory默认将reConnectOnEx

  • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。

  • 我想在远程位置检查Kafka消费者的连接。 可以确定是否将使用者分配给分区。 在远程位置,我可以从Kafka代理获得有关该主题的详细信息。 但是消费者能否保证消费者能够收到消费者与主题分区匹配的消息?

  • 我是一个新的Kafka和使用Apache kafka消费者读取消息从生产者。但当我停下来开始一段时间。之间产生的所有消息都将丢失。如何处理这种情况。我正在使用这些属性“auto.offset.reset”、“latest”和“enable.auto.commit”、“false”。 这是我正在使用的代码。任何帮助都是感激的。