当前位置: 首页 > 知识库问答 >
问题:

无界PCollection的Apache波束集成测试

蓝恩
2023-03-14

我们正在为Apache Beam管道构建一个集成测试,并遇到了一些问题。有关上下文,请参见下文...

有关我们管道的详细信息:

    null
    null

这是一个简单的集成测试,它将验证我们的管道作为一个整体的行为是预期的。

我们目前面临的问题是,当我们运行管道时,它正在阻塞。我们使用的是Directrunnerpipeline.run()(而不是pipeline.run().waituntilfinish()),但是测试似乎在运行管道后挂起。因为这是一个无界的pcollection(以流模式运行),所以管道不会终止,因此没有到达管道之后的任何代码。

所以,我有几个问题:

让我知道如果我可以提供任何额外的代码/上下文-谢谢!

共有1个答案

酆英达
2023-03-14

通过将设置isblockonrun管道选项传递为false,可以使用Directrunner异步运行管道。只要保持对返回的pipelineResult的引用可用,就可以在该结果上调用cancel()来停止管道。

对于你的第三个问题,你的设置似乎是合理的。但是,如果希望对管道进行较小规模的测试(需要较少的组件),可以将所有处理逻辑封装在自定义的PTransform中。这个ptransform应该从输入源获取已经完全解析的输入,并为输出接收器生成尚未解析的输出。

完成此操作后,您可以使用create(通常不执行触发器操作)或teststream(根据如何构造teststream)与directrunner来生成有限数量的输入数据,将此处理ptransform应用于该pcollection,并在输出pcollection上使用passert来验证管道是否生成了预期的输出。

有关测试的更多信息,Beam网站在编程指南和一篇关于使用TestStream测试管道的博客文章中有关于这些测试风格的信息。

 类似资料:
  • a)从有界源读取,在数据流中运行时,PCollection的大小可以有多大?b)当处理大数据时,假设PCollection的大约5000万个数据试图查找另一个PCollection的大约1000万个数据。这能做到吗?beam/dataflow的性能有多好?在一个ParDo函数中,假设我们只能传递一个输入并返回一个输出,如何基于两个输入数据集执行查找?我试图查看Dataflow/beam,类似于任何

  • 我正在构建一个读取Avro通用记录的管道。为了在各个阶段之间传递GenericRecord,我需要注册avrocoder。文档表明,如果我使用通用记录,模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/avrocoder.html#of-java.lang.class-org.a

  • 在我的数据流作业中,我需要初始化配置工厂,并在实际处理开始之前将某些消息记录在审核日志中。 我将配置工厂初始化代码审计日志记录放在父类PlatformInitializer中,并在我的主管道类中扩展它。 因此,我还必须在我的管道类中实现可序列化接口,因为beam抛出了错误-<代码>java。io。NotSerializableException:组织。德维塔姆。自定义作业 在PlatformIni

  • 假设我们有一些嵌套列表: 我们可以像这样轻松地在Stream API中进行翻盖映射: 但是用“FlatMapElements”做这件事,真是一团糟: 我们能用平面贴图功能做得更好吗<一个简单的平面图工作不应该那么复杂,所以我想我遗漏了一些东西 我甚至无法替换。via(列表-

  • 我正在将我的google dataflow java 1.9迁移到beam 2.0,并尝试使用BigtableIO。写 在大舞台前的巴黎公园里,我正在努力让它变得更容易接受。 上述代码引发以下异常InvalidProtocolBufferException:协议消息结束组标记与预期标记不匹配 v是对象列表(Vitals.class)。hbase api使用Put方法创建变异。如何创建将与Bigta

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。