当前位置: 首页 > 知识库问答 >
问题:

在Apache Beam中添加2个Dofn之间的依赖关系

慕烨烁
2023-03-14

是否有任何方法我可以创建2个Dofn之间的依赖关系,以便它将等待第一个Dofn方法完成,然后第二个Dofn方法将运行。只是想知道如何实现这个用例。

共有1个答案

林意蕴
2023-03-14

也许有一个更干净的方法来做到这一点,但我注意到做以下事情会达到你想要的效果:

将第一个DoFn的输出路由到一个计数器,然后将该计数器的输出作为侧输入传递到第二个DoFn的ParDo中

class DoFn2(apache_beam.DoFn):
    def process(self, element, count_do_fn_1_output, *args, **kwargs):
        # ...

do_fn_1_output = do_fn_1_input | 'do fn 1' >> apache_beam.ParDo(DoFn1())

count_do_fn_1_output = (
    do_fn_1_output 
    | 'count do_fn_1_output' >> apache_beam.combiners.Count.Globally())

do_fn_2_output = (
    do_fn_1_output 
    | 'do fn 2' >> apache_beam.ParDo(DoFn2(), count_do_fn_1_output=apache_beam.pvalue.AsSingleton(count_do_fn_1_output)))
 类似资料:
  • 因此,自从添加新的Room android架构库以来,这种情况已经开始发生。我在AppDatabase_Impl没有过期时遇到问题,我通过在注释中添加kapt来修复它: < li>Android Room持久性库和Kotlin < li >在Kotlin中实现房间持久性库 < in Kotlin中的房间持久性库实现(Gradle错误) 我怀疑其他错误是由于AS、Kotlin和Java 8造成的,所

  • 添加依赖 在 pom.xml 中引入 网聚宝监控客户端 的依赖。 <!--网聚宝 监控客户端--> <dependency> <groupId>wangjubao.monitoring</groupId> <artifactId>monitor-client</artifactId> <version>1.4-SNAPSHOT</version> </dependency>

  • 问题内容: 如何获取我拥有的jar文件并将其添加到Maven 2的依赖系统中?我将成为此依赖项的维护者,并且我的代码需要在类路径中使用此jar,以便对其进行编译。 问题答案: 您必须分两步执行此操作: 1.给您的JAR一个groupId,artifactId和版本,然后将其添加到您的存储库中。 如果您没有内部存储库,而只是试图将JAR添加到本地存储库,则可以使用任意groupId / artifa

  • 我有一些OSGi包(简单的OSGi,没有eclipse依赖项)和一个maven构建,使用pax runner在Knopflerfish服务器中启动它们。 maven构建和启动包工作得非常好,但是只能通过Pax runner(< code > mvn Pax:provision )。 为了调试捆绑包,最好直接从eclipse中启动它们,否则我的断点不会触发(似乎很明显,因为pax:provision

  • 我面临着以下问题。我有两个Gradle项目(ProjectA和ProjectB)在同一个层次结构上,没有任何根项目。由于ProjectA依赖于ProjectB,我尝试将其建模如下: ProjectA的settings.gradle: ProjectA的build.gradle: 然而,当我在ProjectA上执行gradle build时,我得到了以下错误: 我必须在ProjectB中定义某种默认

  • 我想知道如何在Gradle中向特定的ProductFraile和buildType添加依赖关系。例如,我有ProductFlavel和构建类型,如何添加对任务的依赖项? 或: 这里没有错误,但任务是针对每一种风格和构建类型执行的,非常混乱。