当前位置: 首页 > 知识库问答 >
问题:

如何在SpringKafka监听器上捕捉“经纪人可能不可用”的警告

徐鸿达
2023-03-14

我正在开发一个POC,用于在我的项目中实现一个kafka集群。我已经在我的本地机器中设置了一个有3个经纪人的kafka集群。现在,我正在使用Spring MVC REST服务向Kafka服务器发送消息,该服务在内部使用Spring Kafka来生成和使用往返于Kafka集群的消息。现在我正在尝试在代理关闭时,当使用者无法接收来自主题的消息时发送警报。我关闭使用者连接到的唯一代理。我的日志中没有任何异常,但我收到以下警告消息。

0:20:35.500[TEST_GROUP-0-C-1]警告O.apache.kafka.clients.networkclient-[Consumer clientid=consumer-1,groupid=test_group]无法建立到节点2147483645的连接。代理可能不可用。

是否有可能捕获此警告消息,以便我可以在我的消费者失去连接时发送警报?下面是我的消费者代码。

private static final Logger LOGGER = LoggerFactory.getLogger(ListenerServiceImpl.class);
    @Autowired
    Dao<RnMessage> messageDao;
    @Autowired
    MessageService messageService;

    @KafkaListener(id = "TEST_GROUP", topics = "TESTQUEUE", errorHandler="eventQueueMessageListenerExceptionHandler")
    public void listenMessageInQueue(String msg) {

        try {
            //String str = new String(msg, "UTF-8");
            LOGGER.info("receiving payload='{}'", msg);
            messageDao.saveMessage(msg);
            messageService.sendMessageToOutQueue(msg);
        }catch(Exception e) {
            e.printStackTrace();
        }       
    }

共有1个答案

狄海
2023-03-14

当使用者无法轮询代理时,它将抛出NonResponveConsumerEvent。可以使用@EventListener捕获此事件。

下面是示例代码:

@EventListener()
public void eventHandler(NonResponsiveConsumerEvent event) {
    //When Kafka server is down, NonResponsiveConsumerEvent error is caught here.
    System.out.println("CAUGHT the event "+ event);
}

你可以在文档中看到更多的细节。https://docs.spring.io/spring-kafka/reference/htmlsingle/#空闲容器

我在文件中特别提到这一点。

如果代理不可达(在编写本文时),则使用者poll()方法不会退出,因此不会接收到任何消息,也无法生成空闲事件。为了解决这个问题,容器将发布一个nonResponveConsumerEvent

 类似资料:
  • 我正在研究一个POC,用于在我的项目中实现Kafka集群。我已经设置了一个Kafka集群在我的本地机器与3经纪人。现在,我使用Spring MVC REST服务向Kafka服务器发送消息,该服务在内部使用Spring Kafka生成和使用往返于Kafka集群的消息。现在,我试图发送警报时,消费者无法接收消息从主题时,经纪人是下降。我关闭消费者连接到的唯一代理。我没有得到任何异常在我的日志,但我得到

  • 我在Windows子系统Linux上安装了kafka,并开始使用命令服务启动,所有服务都已启动。现在,当我尝试从Windows运行我的kafka-spring应用程序时,它显示以下错误:- 无法建立与节点-1(localhost/127.0.0.1:9092)的连接。经纪人可能不可用。 我的服务器属性是:- 我哪里出错了???

  • 问题内容: 我需要捕捉一些从PHP本机函数抛出的警告,然后处理它们。 特别: DNS查询失败时,它将引发警告。 / 不起作用,因为警告也不例外。 我现在有2个选择: 似乎有点过分,因为我必须使用它来过滤页面中的每个警告(这是真的吗?); 调整错误报告/显示,以使这些警告不会在屏幕上显示,然后检查返回值;如果为,则找不到主机名的记录。 这里的最佳做法是什么? 问题答案: 设置和还原错误处理程序 一种

  • 当spring-boot应用程序运行时,如果我完全关闭代理程序(包括kafka和zookeeper),我会在控制台中看到这个警告,持续很长时间。 [org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1]警告o.apache.kafka.clients.networkclient-[Consumer clientid=con

  • 问题内容: 我有以下结构: 通过kafka-topics shell脚本创建了具有复制因子3和分区3的主题。 并使用组localConsumers。领导没事的时候工作正常。 消费者日志 但是,如果领导者倒下了-我在消费者中遇到了错误(systemctl stop kafka): 节点3不可用。好 消费者日志 使用者无法连接,直到领导者掉线或与另一个使用者组重新连接。 不明白为什么会这样?消费者应重

  • 我正在写一个shell脚本来监视Kafka经纪人。 我浏览了一些链接,发现如果ZooKeeper包含一个代理列表,如果这个列表中有IP地址,那么一个kafka代理正在运行。 我想要一个命令,我可以在我的shell脚本中使用,以获得代理列表,并检查kafka是否正在运行。 是否有命令可以像ElasticSearch一样获取kafka集群状态?