当前位置: 首页 > 知识库问答 >
问题:

如何处理apache beam管道中的异常?

查飞星
2023-03-14

 PipelineResult.State state = pipeline.run().waitUntilFinish();
 if(state == PipelineResult.State.FAILED){
            System.out.println("failed");
        }else if(state==PipelineResult.State.DONE){

           

            System.out.println("Done");

        }

共有1个答案

何浩荡
2023-03-14

我建议研究如何在管道的每一步处理错误,而不是试图处理管道失败。

例如,WriteToBigQuery接收器的默认重试策略是always,因此即使错误的格式不是可以成功写入BigQuery的格式,也将始终重试。您可以考虑实现死信模式来捕获错误并将它们写入不同的源。

我只使用Kafka和PubSub作为管道的输入,所以我不确定从文件读取时如何处理错误,以及您可能会遇到什么问题。

在读取和写入数据之间创建审核或任何其他处理步骤时,您可以使用标记输出,并尝试捕获从一个步骤中为成功和失败的输出创建两个不同的输出,然后将它们写入任何您喜欢的地方

 类似资料:
  • 问题内容: 有谁知道bash如何通过管道发送数据? 此命令是否将file.txt的所有内容打印到缓冲区中,然后由tail读取?还是说,此命令是逐行打印file.txt的内容,然后在每一行停顿以进行尾部处理,然后请求更多数据? 我问的原因是我要在嵌入式设备上编写程序,该程序基本上对某些数据块执行一系列操作,其中一个操作的输出作为下一个操作的输入发出。我想知道linux(bash)是如何处理的,因此请

  • 问题内容: 先前曾问过一个有关如何覆盖环境指令中定义的变量的问题,看来这是不可能的。 我想在一个阶段中设置一个变量,并使其可用于其他阶段。在声明性管道中,似乎唯一的方法是在script {}块中。 例如,我需要在结帐后设置一些变量。因此,在结帐阶段结束时,我有一个script {}块来设置这些var,并且可以在其他阶段访问它们。 这可行,但感觉不对。并且出于可读性考虑,我更愿意在管道的顶部声明这些

  • 我们正在将Oracle Weblogic server 8.1升级到Weblogic server 12c,并将java 1.4升级到1.8 我的任务是确保应用程序功能保持不变。一些应用程序自2007年以来就没有碰过。 前面的代码是: 搜索“weblogic.jar”(在“\Oracle\Middleware\Oracle\u Home\wlserver\server\lib”中找到),我看到它没

  • 我目前正在尝试为spring boot实现一个自定义的错误处理程序,我已经用以下方法实现了它: 不知为什么这不起作用,并且异常仍然被抛给客户端,是否有某种方法捕获方法抛出的异常并忽略它。

  • 作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。 我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续? 我认识到,我可以使用选项跳过列数不一致的记录,该选项几乎就足够了。问题

  • 我的存储库中有一个带有Jenkinsfile的多分支管道,我能够拥有我的CI工作流程(构建 为了设置我的夜间构建,我添加了 但是现在,如果我们只是在晚上构建由cron表达式触发的作业,如何执行分析步骤呢? 我的简化构建阶段如下所示: