我们正在为Apache Beam管道构建一个集成测试,并遇到了一些问题。有关上下文,请参见下文...
有关我们管道的详细信息:
这是一个简单的集成测试,它将验证我们的管道作为一个整体的行为是预期的。
我们目前面临的问题是,当我们运行管道时,它正在阻塞。我们使用的是Directrunner
和pipeline.run()
(而不是pipeline.run().waituntilfinish()
),但是测试似乎在运行管道后挂起。因为这是一个无界的pcollection
(以流模式运行),所以管道不会终止,因此没有到达管道之后的任何代码。
所以,我有几个问题:
让我知道如果我可以提供任何额外的代码/上下文-谢谢!
通过将设置isblockonrun
管道选项传递为false
,可以使用Directrunner
异步运行管道。只要保持对返回的pipelineResult
的引用可用,就可以在该结果上调用cancel()
来停止管道。
对于你的第三个问题,你的设置似乎是合理的。但是,如果希望对管道进行较小规模的测试(需要较少的组件),可以将所有处理逻辑封装在自定义的PTransform
中。这个ptransform
应该从输入源获取已经完全解析的输入,并为输出接收器生成尚未解析的输出。
完成此操作后,您可以使用create
(通常不执行触发器操作)或teststream
(根据如何构造teststream
)与directrunner
来生成有限数量的输入数据,将此处理ptransform
应用于该pcollection
,并在输出pcollection
上使用passert
来验证管道是否生成了预期的输出。
有关测试的更多信息,Beam网站在编程指南和一篇关于使用TestStream
测试管道的博客文章中有关于这些测试风格的信息。
a)从有界源读取,在数据流中运行时,PCollection的大小可以有多大?b)当处理大数据时,假设PCollection的大约5000万个数据试图查找另一个PCollection的大约1000万个数据。这能做到吗?beam/dataflow的性能有多好?在一个ParDo函数中,假设我们只能传递一个输入并返回一个输出,如何基于两个输入数据集执行查找?我试图查看Dataflow/beam,类似于任何
我正在构建一个读取Avro通用记录的管道。为了在各个阶段之间传递GenericRecord,我需要注册avrocoder。文档表明,如果我使用通用记录,模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/avrocoder.html#of-java.lang.class-org.a
在我的数据流作业中,我需要初始化配置工厂,并在实际处理开始之前将某些消息记录在审核日志中。 我将配置工厂初始化代码审计日志记录放在父类PlatformInitializer中,并在我的主管道类中扩展它。 因此,我还必须在我的管道类中实现可序列化接口,因为beam抛出了错误-<代码>java。io。NotSerializableException:组织。德维塔姆。自定义作业 在PlatformIni
假设我们有一些嵌套列表: 我们可以像这样轻松地在Stream API中进行翻盖映射: 但是用“FlatMapElements”做这件事,真是一团糟: 我们能用平面贴图功能做得更好吗<一个简单的平面图工作不应该那么复杂,所以我想我遗漏了一些东西 我甚至无法替换。via(列表-
我正在将我的google dataflow java 1.9迁移到beam 2.0,并尝试使用BigtableIO。写 在大舞台前的巴黎公园里,我正在努力让它变得更容易接受。 上述代码引发以下异常InvalidProtocolBufferException:协议消息结束组标记与预期标记不匹配 v是对象列表(Vitals.class)。hbase api使用Put方法创建变异。如何创建将与Bigta
嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。