当前位置: 首页 > 知识库问答 >
问题:

如何处理Kafka流中数据处理过程中发生的错误

奚飞星
2023-03-14

我正在使用Spring云流Kafka流编写Java应用程序。下面是我正在使用的函数方法片段:

@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input ->
        input.transform(
            () ->
                new Transformer<String, String, KeyValue<String, String>>() {

                  ProcessorContext context;

                  @Override
                  public void init(ProcessorContext context) {
                    this.context = context;
                  }

                  @Override
                  public void close() {}

                  @Override
                  public KeyValue<String, String> transform(String key, String value) {                       
                         String result = fetch_data_from_database(key, value);
                         return new KeyValue<>(key, result);
                  }
});

fetch_data_from_database()可以抛出异常。

如果fetch\u from\u database()发生异常,如何停止对入站KStream的处理(不应提交偏移量),并使其使用相同的偏移量数据重试处理?

共有1个答案

郎祯
2023-03-14

在这种情况下,您需要自己重试该逻辑。为此,可以使用Spring的RetryTemplate。此答案详细介绍了如何在Kafka Streams中使用RetryTemplate。它不像您所使用的那样使用低级处理器API,但这是相同的想法。将数据库调用包装在重试模板中,并根据需要自定义重试。任何上游处理都将暂停,直到重试完毕。

 类似资料:
  • 我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个

  • 我有以下Kafka配置类: 以及以下KafkaListener: 我想使用<code>SeekToCurrentErrorHandler</code>进行错误处理,我想使用类似于这里的特定功能,但目前我正在使用<code>springBootVersion=2.0.4。你能帮我设置依赖项和配置以处理Kafka消费者中的错误吗? 问候!

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:

  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异