当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Kafka流中的错误处理

陈修诚
2023-03-14

我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。

现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个不必要的事情,而且我仍然需要返回一个kstream:我如何在catch中做到这一点?

我还查看了UncaughtExeptionHandler,但它不知道消息,只能重新启动处理,不会跳过这个无效的消息。

这可能听起来像是一个A-B问题,所以问题的措辞是:当发生异常并将无效项发送到DLQ时,我如何维护KStream中的流?

共有1个答案

潘佐
2023-03-14

当涉及到应用程序级别的错误时,如何处理错误取决于应用程序本身。Kafka Streams和Spring Cloud Stream绑定器主要支持框架级别的反序列化和序列化错误。虽然是这样,但我认为你的场景是可以处理的。如果您使用的是Kafka客户端2.8之前的版本,这里有一个类似的答案:https://stackoverflow.com/A/66749750/2070861

如果您正在使用Kafka/Streams2.8,这里有一个您可以使用的想法。但是,下面的代码应该只作为一个起点。根据您的用例进行调整。阅读更多关于Kafka Streams2.8中分支如何工作的信息。分支API在2.8中与以前的版本进行了显著的重构。

public Function<KStream<?, String>, KStream<?, Foo>> convert() {
            Foo[] foo = new Foo[0];
            return input -> {
                final Map<String, ? extends KStream<?, String>> branches =
                        input.split(Named.as("foo-")).branch((key, value) -> {
                                    try {
                                        foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
                                        return true;
                                    }
                                    catch (Exception e) {
                                        Message<?> message = MessageBuilder.withPayload(value).build();
                                        streamBridge.send("to-my-dlt", message);
                                        return false;
                                    }

                                }, Branched.as("bar"))
                                .defaultBranch();

                final KStream<?, String> kStream = branches.get("foo-bar");
                return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
            };

        }


    }

默认分支在此代码中被忽略,因为它只包含引发异常的记录。这些由上面的catch语句处理,在该语句中,我们以编程方式将记录发送到DLT。最后,我们获取好的记录并将它们映射到新的kstream并通过出站发送。

 类似资料:
  • 问题内容: 我具有以下Spring Integration配置,该配置允许我从MVC Controller调用网关方法并让控制器返回,而集成流将在不阻塞控制器的单独线程中继续进行。 但是,我无法弄清楚如何使我的错误处理程序为该异步流工作。我的网关定义了错误通道,但是由于某种原因我的异常没有到达。相反,我看到被调用了。 网关: 为了查看我的错误处理程序正在处理的异步集成流程中发生的异常,我该怎么办?

  • 我有一个这样的集成流程:

  • 我正在使用Spring云流Kafka流编写Java应用程序。下面是我正在使用的函数方法片段: fetch_data_from_database()可以抛出异常。 如果fetch\u from\u database()发生异常,如何停止对入站KStream的处理(不应提交偏移量),并使其使用相同的偏移量数据重试处理?

  • 问题内容: 解析器不知道要做什么时的默认行为是将消息打印到终端,例如: 第1:23行在“}”处缺少DECIMAL 这是一个很好的信息,但是在错误的位置。我宁愿将此作为例外。 我尝试使用,但是会抛出一个没有消息的消息(由引起,也没有消息)。 有什么办法可以让我通过异常报告错误,同时又保留消息中的有用信息? 这就是我真正想要的—我通常在规则中使用动作来构建对象: 然后,当我调用解析器时,我将执行以下操

  • 问题内容: 我有一系列要解决的承诺 我继续继续诺言链。看起来像这样 我想添加一个catch语句来处理单个promise,以防万一出错,但是当我尝试时,返回它发现的第一个错误(忽略其余的),然后我无法从其余的promise中获取数据数组(没有错误)。 我尝试做类似.. 但这并不能解决。 谢谢! - 编辑: 下面的答案完全正确,但代码由于其他原因而中断。如果有人感兴趣,这就是我最终得到的解决方案… 节

  • 问题内容: 有没有一种方法来处理错误,比如,你可以在使用块,等等? 我已经在网上搜索过,发现的唯一选择是global变量,但是它没有按我预期的那样工作,例如,以下代码: 确实会以某种方式回滚,因为我在table1上更改的名称保留了我在此处进行测试时的旧值,但是它不会打印消息,也不会执行我在导致错误的指令之后放置的任何指令 有人可以帮我吗?您知道Sybase错误处理是如何工作的吗? 问题答案: 第一