当前位置: 首页 > 知识库问答 >
问题:

Kafka流关闭挂钩和意外异常处理在同一流应用程序中

拓拔高畅
2023-03-14

我的任务是拆除一个开发环境,并从废品中重新设置它,以验证我们的CI-CD流程;唯一的问题是我搞砸了创建一个主题,因此Kafka Streams应用程序退出并出现错误。

我仔细研究了一下,发现了问题并纠正了它,但当我深入研究时,我遇到了另一个奇怪的小问题。

我实现了一个意外的异常处理器,如下所示:

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    streams.close();

    // Maybe do some more exception handling.
    // Open a lock that is waiting after streams.start() call 
    // to let application exit normally
    shutdownLatch.countDown();
});

问题是,如果应用程序抛出一个异常,因为一个主题错误时,KafkaStreams::c失去的是调用应用程序似乎死锁在WindowsSeletorImpl::轮询后,尝试调用KafkaStreams::waitOnState。

我认为这可能是在异常处理程序中调用KafkaStreams::close的问题,但我发现这是这样的,Matthias J. Sax的评论说,在异常处理程序中调用KafkaStreams::close应该是可以的,但不要从多个线程中调用KafkaStreams::Close。

问题是,我想实现一个关闭挂钩,以便根据请求优雅地终止steams应用程序,并实现意外异常处理程序,以便在出现异常时进行清理并优雅地终止。

我提出了以下解决方案,该解决方案在调用close之前检查KafkaStreams的状态,它确实有效,但这似乎有点可疑,因为我可以看到除了运行(可能是Pending)之外的其他情况,我们希望确保KafkaStream::close它调用了。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.fatal("Caught Shutdown request");
    // Do some shutdown cleanup.
    if (streams.state().isRunning())
    {
        If this hook is called due to the Main exiting after handling 
        an exception we don't want to call close again. It doesn't 
        cause any errors but logs that the application was closed 
        a second time.
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do a little bit more clean up before system exits.
    System.exit(0);

}));

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    if (streams.state().isRunning())
    {
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do some more exception handling.

    // Open the Gate to let application exit normally
    shutdownLatch.countDown();
    // Or Optionally call halt to immediately terminate and prevent call to Shutdown hook.
    Runtime.getRuntime().halt(0);
});

关于为什么在异常处理程序中调用KafkaSteams:close会导致这样的问题,或者是否有更好的方法来同时实现关闭钩子和异常处理程序,这将非常受欢迎?

共有1个答案

澹台聪
2023-03-14

从异常处理程序调用<code>close()</code>与从shutdown钩子调用略有不同close()如果从shutdown钩子(参见。https://issues.apache.org/jira/browse/KAFKA-4366)因此,您应该使用超时调用它。

此外,该问题与从Jira中描述的未捕获的异常处理程序中调用System.exit()有关。一般来说,调用System.exit()非常苛刻,恕我直言,应该避免。

您的解决方案似乎也不是100%健壮的,因为< code>streams.state()。isRunning()可能会导致争用情况。

使用超时的另一种方法可能是仅在关闭挂钩和异常处理程序中设置原子布尔,如果布尔标志设置为true,则使用“main()”线程调用关闭:

private final static AtomicBoolean stopStreams = new AtomicBoolean(false);

public static void main(String[] args) {
  // do stuff

  KafkaStreams streams = ...
  stream.setUncaughtExceptionHandler((t, e) -> {
    stopStreams.set(true);
  });

  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    stopStreams.set(true);
  });

  while (!stopStreams.get()) {
    Thread.sleep(1000);
  }
  streams.close();
}
 类似资料:
  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异

  • 我正在使用 Spring靴:2.3.5.释放Spring云:Hoxton.SR8 我正在尝试Spring云流Kafka流应用程序。一切都运行良好,直到出现反序列化异常。应用程序每次都会关闭。 我想跳过不良记录,在Kafka主题中前进。但我无法实现这一点。配置: 我得到的错误是 现在我正在使用这个设置。它仍然没有效果。根据留档,它应该简单地记录错误并继续处理。即它应该跳过不良记录。但这并没有发生。看

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 主要内容:前记,1.processHandlerException方法前记 根据之前的文章方法中的方法返回处理的方法 1.processHandlerException方法 这个方法就是如果出现异常的话, 异常解析器进行处理异常。 先判断是否是注解下的方法, 如果是的话另外处理 -> 判断是否是注解下的方法 这里的主要有3个实现类 1.1注解下的异常 1.2注解下的方法 获取到装填码 获取到出错理由 然后渲染异常的页面 返回空的ModelAndView 1.3解析方

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 问题内容: 我需要在Java Web应用程序停止或tomcat停止时保存一些数据。如何才能做到这一点?编辑:如果我使用jvm shutdown钩有什么缺点? 问题答案: 使用在您的web.xml 中实现ServletContextListener的类: