当前位置: 首页 > 知识库问答 >
问题:

如何在Spring Kafka Stream中配置UncaughtExceptionHandler

夏意蕴
2023-03-14

我正在使用spring kafka来实现一个使用spring Boot 1.5.16的流应用程序。我们使用的SpringKafka版本是1.3.8。释放。

我正在搜索一种方法,以便在出现终止与Kafka流关联的所有线程的错误时关闭启动应用程序。我发现在KafkaStreams中,有可能为未捕获的异常注册句柄。方法是setGlobalStateRestoreListener。

我看到这个方法在类型kstreambilderfactorybean中的SpringKafka中公开了。

我的问题如下。有没有一种简单的方法可以将UncaughtExceptionHandler注册为bean并让Spring在工厂bean中正确注入?或者我应该创建自己的KStreamBuilderFactoryBean并手动设置处理程序?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}

谢谢。

共有1个答案

景宏富
2023-03-14

对在旧版本中,您必须通过适当的注入和KafkaStreamsDefaultConfiguration自己指定一个KStreamBuilderFactoryBean。默认\u KSTREAM\u BUILDER\u BEAN\u NAME。

在以后的版本中,我们已经有了一个StreamsBuilderFactoryBeanConfigurer,可以保持自动配置的KStreamBuilderFactoryBean,但可以根据需要对其进行任何修改。

更新

您只需在应用程序上下文中将其创建为bean,框架就会将其应用到StreamsBuilderFactoryBean上:

    @Bean
    StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
        return new StreamsBuilderFactoryBeanConfigurer() {

            @Override
            public void configure(StreamsBuilderFactoryBean factoryBean) {
                factoryBean.setCloseTimeout(...);
            }

            @Override
            public int getOrder() {
                return Integer.MAX_VALUE;
            }

        };
    }
 类似资料:
  • 问题内容: 当我将struts.xml放入web-inf时显示错误......当我将src文件夹放入时,它工作正常。(在web- inf中为applicationcontext.xml) 当我将applicationcontext.xml放在src文件夹中时(在(src文件夹或src / resources)中为struts.xml).......这就是说无法在web-inf文件夹中找到appli

  • 问题内容: 我有一个正在Swing中构建的应用程序。它具有可滚动和缩放的图表组件,可以平移和缩放它。整个过程很平滑,除了有时UI会暂停约750 ms,我不知道为什么。这种情况并非总是会发生-但有时应用程序中会发生某些事情,并且每6-8秒就会开始暂停一次。 很显然,EDT上有一些事件需要花费750毫秒左右的时间才能运行,这不应该发生。 我如何特别像这样配置EDT?我真正想做的是获取在每次事件在EDT

  • https://docs . JBoss . org/hibernate/ORM/5.2/javadocs/org/hibernate/interceptor . html说onPrepareStatement(字符串sql)不推荐使用。如果希望检查和修改SQL语句,请提供StatementInspector。 但是我不清楚如何在hibernate中在应用程序级别配置StatementInspec

  • 我在学习Spring Boot时遇到了一些编码问题;我想添加一个像Spring3.x那样的CharacterEncodingFilter。就像这样:

  • 我试图在我的应用程序中实现jpa 发展能力 所以,我的DataConfig看起来像这样: 但是,当我尝试启动应用程序时,会出现错误消息。制造战争- 已连接到服务器[2018-09-26 09:54:32631]工件未命名:正在部署工件,请稍候。。。2018年9月26日09:54:34.460警告[RMI TCP连接(3)-127.0.0.1]组织。阿帕奇。公猫dbcp。dbcp2。基本资源工厂。g

  • 我有一个使用以下方法的会话豆: 返回的CalculationResult无法映射到JSON,并出现以下异常: 如何在 Wildfly 中配置杰克逊及其?

  • 是否可以覆盖嵌入的 CXF JaxRS 容器的 TomEE 中的设置?我一直在尝试集成我自己的拦截器来处理JaxRS服务的路由。 我似乎不能做的是让TomEE在启动我的服务时加载拦截器。 我读过很多文章,其中说我应该能够通过添加以下内容来配置TomEE中的CXF内容 我还看到了一些引用,说明TomEE使用了TomEE。xml替换openejbjar。xml,所以我尝试在那里进行pojo部署,但由于

  • 我使用这个命令运行我的spring应用程序-java-jar,但是当从IDE运行应用程序时,我需要添加运行配置,下面是显示我所添加内容的屏幕截图,但是它对我不起作用。