当前位置: 首页 > 知识库问答 >
问题:

Spring cloud kafka流应用程序因异常而终止

宰父焕
2023-03-14

我有一个简单的Spring云kafka流应用程序。每次有异常时,应用程序都会终止,我无法覆盖此行为。当有某些类型的异常时,所需的结果是增量退回,或者继续其他类型的异常。我使用spingCloudVersion-Hoxton.SR3Spring启动:2.2.6.RELEASE

application.yaml

spring:
 cloud:
   stream:
     binders.process-in-0:
         destination: test

     kafka:
       streams:
         binder:
           deserializationExceptionHandler: logAndContinue
           configuration:
             default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
             default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

豆子

@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
    return input -> input.process(() -> new EventProcessor());
}

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                ContinueOnErrorHandler.class);
    };
}

事件处理器

public class EventProcessor implements Processor<String, String>, ProcessorSupplier<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {

        throw new RuntimeException("Some exception");
    }

    @Override
    public void close() {

    }

    @Override
    public Processor<String, String> get() {
        return this;
    }
}

继续错误处理程序

public class ContinueOnErrorHandler implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
        return ProductionExceptionHandlerResponse.CONTINUE;
}

    @Override
    public void configure(Map<String, ?> configs) {
        //ignore
    }
}

共有1个答案

苏鸿波
2023-03-14

您从消费者处使用的自定义处理器正在进程方法中抛出Runtime Exc的。它不会被任何东西捕获。当抛出该异常时,应用程序只需退出。

您正在使用的生产异常处理程序在这里没有任何效果,因为您在这里没有生产任何内容消费者不生产任何产品。如果您有生产产品的用例,您应该切换到java.util.funciton。改为函数

为了解决这个问题,当您在自定义处理器(< code>EventProcessor)中处理记录时,如果您得到一个异常,您应该捕捉它并采取适当的操作。例如,这里有一个模板:

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public void process(String key, String value) {
            try {
                // start processing
                // exception thrown

            }
            catch (Exception e){
                // Take the appropriate action
            }
        }

这样,当处理器中引发异常时,应用程序就不会终止。

 类似资料:
  • 每次尝试创建Grails项目时,都会遇到以下错误。我可以从终端创建项目,但不能从IDE。我使用的是GGTS,对Grails是新手。 命令以异常终止:java.lang.exception(部分输出请参见详细信息)命令:GrailsCommand(p/booksolution>compile-non-interactive-refresh-dependencies)----system.out---

  • ...有点。正如这个极其简单的例子所示, 我的一个应用程序很少以这种方式崩溃(到目前为止只报告过一次)。当一个不确定的异常发生时,我想像往常一样终止它。我的策略是(低级)记录问题,然后终止。应用程序是子系统的一部分,如果检测到任何问题,我想(重新)启动它。它是用C -Builder 6构建的,在Windows (XP)上运行...7,也是8)。我了解到< code>abort()很可能导致了这个错

  • 问题内容: 我正在尝试创建一个简单的手电筒应用程序,以学习android开发。我正在尝试获取它,以便当您单击灯光ImageView对象时,它会更改图像。但是现在,当调试器进入light.setImageResource()时,它崩溃。 它抛出的错误是 问题答案: 崩溃: 正如您的堆栈跟踪所说:“ java.lang。OutOfMemoryError : 未能分配51840012字节分配,其中包含4

  • 应用程序由于stacktrace的StackOverflow错误而不时崩溃(见下文)。 该错误意外地出现在各种方案中。我们无法模拟它或预测它的发生。 提前谢谢你。 Stacktrace错误(缩短版):

  • 应用程序抛出一个 堆栈跟踪 这个问题似乎已经在spark 1.1中解决了。0根据此链接 火花:1.1。Kafka:0.8。1.1