当前位置: 首页 > 知识库问答 >
问题:

在调试器中调试apache beam/dataflow?

贺浩壤
2023-03-14

这首先与这篇文章高度相关->如何在Dataflow中进行这种类型的测试(在twitter上称为功能测试)?

我们在生产中有一些类似的代码

@Override
public PDone expand(PCollectionTuple in) {
    final RosterPipelineOptions options = (RosterPipelineOptions) in.getPipeline().getOptions();
    try {
        final Schema schema = new Schema.Parser().parse(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc"));
        final String schemaStr = new BufferedReader(new InputStreamReader(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc")))
            .lines().collect(Collectors.joining("\n"));

        final PCollection<Validated<PractitionerStandardOutputDto>> validOutputs = in.get(PractitionerStandardOutputConverter.VALID_OUTPUTS_TAG);
        final PCollection<Validated<PractitionerStandardOutputDto>> invalidOutputs = in.get(PractitionerStandardOutputConverter.INVALID_OUTPUTS_TAG);

        final PCollection<GenericRecord> validRecords = validOutputs.apply(
            "Transform Valid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
        final PCollection<GenericRecord> invalidRecords = invalidOutputs.apply(
            "Transform Invalid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));

        validRecords
            .setCoder(AvroCoder.of(GenericRecord.class, schema))
            .apply("Write Records", AvroIO.writeGenericRecords(schema)
                .to(options.getValidOutput())
                .withoutSharding());

        final PCollection<String> invalidRows = invalidRecords
            .setCoder(AvroCoder.of(GenericRecord.class, schema))
            .apply("Convert Error Avro to Csv", ParDo.of(new AvroToCsvConvertFn(schemaStr, ",")));
        invalidRows.apply("Write Error Records to Csv",
            TextIO.write().to(options.getInvalidOutput()).withoutSharding());

        return PDone.in(in.getPipeline());
    }
    catch (IOException e) {
        SneakyThrow.sneak(e); return null;
    }
}
  • 这是编写可调试的Apache-Beam/Dataflow代码的最佳方式吗?我们可以逐步通过这些代码并轻松地看到我们的bug在哪里?
  • 是否有其他方法可以方便地调试它,因为我怀疑在应用stuff时,“真正的执行”发生在该方法之后?

谢谢,迪恩

共有1个答案

乌翔
2023-03-14

总的来说,就像你的另一个问题一样,我的建议是:

  1. 要单步执行管道,可以编写一个用IDE运行的单元测试,它将在Directrunner中运行。这使您可以轻松地单步执行您的管道。这不是在数据流中运行的,而是在本地运行的--它仍然很有价值。

您可以在expand中设置断点,这些断点将在管道构建时命中。您可以在DoFn的进程中设置断点,或者在源代码的拆分读取中设置断点--这些断点将在流水线执行时命中。

一些最有价值的测试工具是passertTestStreamTestPipeline。我建议您查看我共享的页面,看看这些实用程序是否会有所帮助。

对于特定的管道,我可能认为可以将PTransform分成更小的部分,并为每个部分编写简单的单元测试。

 类似资料:
  • 我无法对WebStorm中的TypeScript文件进行远程调试。我目前使用的是WebStorm 6.0、TS编译器0.8.3和Chrome JetBrains插件0.5.7。 出于调试的目的,我所使用的服务器是一个简单的python服务器,使用命令在我的本地计算机上与源文件位于同一目录中。

  • 调试器 调试工具都内置于主流浏览器中(Firefox 中需独立下载 Firebug)。更多关于 Google Chrome DevTools 的信息可以在这里找到。

  • 我正在使用Angular 9、Node v14和e2e测试,使用Cucumber、量角器和量角器Cucumber框架。 量角器。conf.js tsconfig。e2e。json 发射json package.json 我可以在VSCode中启动调试器,但问题是每个断点都是未绑定的断点。 我还尝试在tsconfig中添加。e2e。json和我也尝试加入launch。json 问题似乎是ts节点在内

  • 稳定性: 2 - 稳定的 Node.js 包含一个进程外的调试工具,可以通过一个基于 TCP 协议且内置的调试客户端访问。 要使用它,需要以 debug 参数启动 Node.js,并带上需要调试的脚本的路径;然后会出现一个提示,表明已成功启动调试器: $ node debug myscript.js < Debugger listening on 127.0.0.1:5858 connecting

  • 应用程序的开发过程中调试是必不可少的一个环节,因此有一个好的调试器是非常重要的,可惜的是,Go 在这方面的发展还不是很完善。目前可用的调试器是 gdb,最新版均以内置在集成开发环境 LiteIDE 和 GoClipse 中,但是该调试器的调试方式并不灵活且操作难度较大。 如果你不想使用调试器,你可以按照下面的一些有用的方法来达到基本调试的目的: 在合适的位置使用打印语句输出相关变量的值(print

  • 我需要在VsCode上调试我的反应原生应用程序,我是新的反应原生开发...:)我搜索并遵循不同的方法,但没有运气...:(首先,我遵循此方法https://medium.com/@Tunvirrahmantushs/react-nate-debug-with-vscode-in-imple-steps-bf39b6331e67并遵循此方法https://www.youtube.com/watch?