当前位置: 首页 > 知识库问答 >
问题:

如何在Spring Kafka Listener上捕捉“Broker可能不可用”的警告

梁烨
2023-03-14

我正在研究一个POC,用于在我的项目中实现Kafka集群。我已经设置了一个Kafka集群在我的本地机器与3经纪人。现在,我使用Spring MVC REST服务向Kafka服务器发送消息,该服务在内部使用Spring Kafka生成和使用往返于Kafka集群的消息。现在,我试图发送警报时,消费者无法接收消息从主题时,经纪人是下降。我关闭消费者连接到的唯一代理。我没有得到任何异常在我的日志,但我得到了以下警告信息。

0:20:35.500[TEST_GROUP-0-C-1]WARNo.apache.kafka.clients.NetworkClient-[消费者clientId=消费者-1, group pId=TEST_GROUP]无法建立到节点2147483645的连接。经纪人可能不可用。

是否可以捕获此警告消息,以便在消费者失去连接时发送警报?下面是我的消费代码。

private static final Logger LOGGER = LoggerFactory.getLogger(ListenerServiceImpl.class);
    @Autowired
    Dao<RnMessage> messageDao;
    @Autowired
    MessageService messageService;

    @KafkaListener(id = "TEST_GROUP", topics = "TESTQUEUE", errorHandler="eventQueueMessageListenerExceptionHandler")
    public void listenMessageInQueue(String msg) {

        try {
            //String str = new String(msg, "UTF-8");
            LOGGER.info("receiving payload='{}'", msg);
            messageDao.saveMessage(msg);
            messageService.sendMessageToOutQueue(msg);
        }catch(Exception e) {
            e.printStackTrace();
        }       
    }

共有1个答案

慕和惬
2023-03-14

当消费者无法轮询代理时,它将抛出一个Non响应消费者事件。可以使用@EventListener捕获此事件。

下面是一个示例代码:

@EventListener()
public void eventHandler(NonResponsiveConsumerEvent event) {
    //When Kafka server is down, NonResponsiveConsumerEvent error is caught here.
    System.out.println("CAUGHT the event "+ event);
}

你可以在文档中看到更多细节。https://docs.spring.io/spring-kafka/reference/htmlsingle/#idle-containers

我在文件中特别提到这一点。

如果代理不可达(在撰写本文时),则不退出消费者投票()方法,因此不会收到任何消息,也无法生成空闲事件。为了解决这个问题,容器将发布一个Non响应消费者事件

JUnit测试代码正在运行https://github.com/shankarps/KafkaDemo/blob/master/src/test/java/com/kafkademo/stack/TestConsumerWithNonAvailableBroker.java

 类似资料:
  • 我正在开发一个POC,用于在我的项目中实现一个kafka集群。我已经在我的本地机器中设置了一个有3个经纪人的kafka集群。现在,我正在使用Spring MVC REST服务向Kafka服务器发送消息,该服务在内部使用Spring Kafka来生成和使用往返于Kafka集群的消息。现在我正在尝试在代理关闭时,当使用者无法接收来自主题的消息时发送警报。我关闭使用者连接到的唯一代理。我的日志中没有任何

  • 问题内容: 我需要捕捉一些从PHP本机函数抛出的警告,然后处理它们。 特别: DNS查询失败时,它将引发警告。 / 不起作用,因为警告也不例外。 我现在有2个选择: 似乎有点过分,因为我必须使用它来过滤页面中的每个警告(这是真的吗?); 调整错误报告/显示,以使这些警告不会在屏幕上显示,然后检查返回值;如果为,则找不到主机名的记录。 这里的最佳做法是什么? 问题答案: 设置和还原错误处理程序 一种

  • 我和我的Kafka制作人遇到了一个奇怪的问题。我使用Kafka-0.11服务器/客户端版本。我有一个zookeper和一个kafka经纪人节点。此外,我还创建了带有3个分区的“事件”主题: 在我的Java代码中,我创建了具有以下属性的producer: 此外,我还向Producer#send()方法添加了一个回调,该方法将失败的消息添加到队列中,该队列由另一个“重新发送”线程在循环中迭代: 一切正

  • [kafka-producer-network-thread>producer-1]警告org.apache.kafka.clients.networkclient-[Producer clientid=producer-1]无法建立到节点0的连接(/xx.xx.xx.xx.xx:9092)。代理可能不可用。 [主]信息org.apache.Kafka.clients.Producer.kafka

  • 指的是指数的Spring综合路线。html,我正在尝试使用regex来捕获所有路由。但我得到了一个错误: 标记错误:java.util.regex.模式语法异常,在索引2处悬挂元字符“*” 当我通过修复这个问题时,我的正则表达式无法捕捉路由。 要求就同样的问题提供指导。

  • 在节点中。在js服务器上,捕获SIGTERM和捕获SIGINT有什么区别吗? 我认为进程不应该能够防止SIGINT关闭? 我是否能够捕获两个信号并阻止退出?我的实验表明答案是肯定的,但从我所读到的内容来看,SIGINT总是假设关闭一个进程。 或者我把SIGINT和SIGKILL混淆了?也许SIGKILL是我无法恢复的信号? 捕捉这些信号当然可以让我优雅地关机: 我想我把SIGINT和SIGKILL