当前位置: 首页 > 知识库问答 >
问题:

如何跳过在Kafka中产生运行时异常的记录并保持流运行?

乌俊健
2023-03-14

我已经实现了kafka流应用程序。假设流当前正在处理的对象的一个字段包含一个数字而不是一个字符串值。当前,当处理逻辑(如< code >)中出现异常时。transform()方法,整个流被终止,我的应用程序停止处理数据。

我想跳过此类无效记录并继续处理输入主题上可用的下一条记录。此外,我不想在我的流处理代码中实现任何 try-catch 语句。

为了实现这一点,我实现了StreamsUncaughtExceptionHandler,因此它返回StreamThreadExceptionResponse.REPLACE_THREAD枚举,以便生成新线程并继续处理等待输入主题的下一条记录。然而,事实证明,流消费者偏移量没有提交,当新线程启动时,它会获取刚刚杀死前一个流线程的旧记录…由于逻辑相同,新线程也将无法处理错误记录,并再次失败。某种循环产生新线程,每次都在同一记录上失败。

是否有任何干净的方法来跳过失败的记录并保持流处理下一个记录?

请注意,我不是在询问DeserializationExceptionHandlerProductionExceptionHandler

共有2个答案

赫连宏伯
2023-03-14

您可以过滤不匹配模式的事件,或者在转换事件之前验证它们

蓬英逸
2023-03-14

当涉及到应用程序级代码时,如何处理异常主要取决于应用程序。这个用例以前就出现过。请参阅前面的堆栈溢出线程。

使用Kafka Streams Binder和函数式处理器处理Spring Cloud Streams中处理异常的示例

当控件转到catch块时如何停止发送到kafka主题函数kafka spring

尝试查看这些答案是否可以应用于你的方案。

 类似资料:
  • 我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。 参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_st

  • 我的项目依赖于scala日志库,我正在尝试升级我的项目以使用dotty。为此,我从https://github.com/lampepfl/dotty-example-project克隆了示例dotty项目并更新了它,包括日志库。请在下面找到项目的目录结构:- 项目/build.properties:- 项目/plugins.sbt 建筑sbt公司 src/main/scala/main。斯卡拉 上

  • 问题内容: 假设我有一个抛出运行时异常的方法。我正在使用a 来对列表中的项目调用此方法。 现在,我希望处理列表中的所有项目,并将单个项目上的所有运行时异常收集到“聚合”运行时异常中,该异常将在最后抛出。 在我的真实代码中,我正在进行第三方API调用,这可能会引发运行时异常。我想确保所有项目都已处理,并在最后报告所有错误。 我可以想到几种破解方法,例如捕获并返回异常的函数( ..shudder ..

  • 假设我有一个抛出运行时异常的方法。我正在使用对列表中的项调用此方法。 现在我希望列表中的所有项目都被处理,并且单个项目上的任何运行时异常都被收集到一个“聚合”运行时异常中,该异常将在最后抛出。 在我的真实代码中,我正在进行第三方API调用,这可能会引发运行时异常。我想确保所有项目都得到处理,并且在最后报告任何错误。 我可以想出一些方法来解决这个问题,比如一个函数,它捕获并返回异常(... shud

  • 然后跳过用@wip和@test标记的场景。 所以,我想知道,是否可以跳过单个标签中的两个或多个标签,即第1点提到的标签?

  • 我正在为spring调度器使用cron表达式,表达式的值由Springbean使用spel动态提供。 一切都正常工作,唯一的问题是我想记录调度程序何时触发,因为cron表达式是由另一个bean在运行时提供的。所以只想知道是否将通过文件提供的正确表达式映射到属性bean。