当前位置: 首页 > 知识库问答 >
问题:

如何在Apache beam python中创建任务之间的依赖关系

祖利
2023-03-14
import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions

class Sample(beam.PTransform):
    def __init__(self, index):
        self.index = index

    def expand(self, pcoll):
        logging.info(self.index)
        return pcoll

class LoadData(beam.DoFn):
    def process(self, context):
        logging.info("***")

if __name__ == '__main__':

    logging.getLogger().setLevel(logging.INFO)
    pipeline = beam.Pipeline(options=PipelineOptions())

    (pipeline
        | "one" >> Sample(1)
        | "two: Read" >> beam.io.ReadFromText('sample.json')
        | "three: show" >> beam.ParDo(LoadData())
        | "four: sample2" >> Sample(2)
    )
    pipeline.run().wait_until_finish()

我希望它会按照一、二、三、四的顺序执行。但它是在并行模式下运行的。

以上代码的输出:

INFO:root:Missing pipeline option (runner). Executing pipeline using the 
default runner: DirectRunner.
INFO:root:1
INFO:root:2
INFO:root:Running pipeline with DirectRunner.
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***

共有1个答案

齐航
2023-03-14

根据Dataflow的文档:

当管道运行者为分布式执行构建实际管道时,管道可能会被优化。例如,将某些转换一起运行或以不同的顺序运行可能在计算上更有效。Dataflow服务完全管理管道执行的这一方面。

同样根据Apache Beam的文档:

 类似资料:
  • 因此,自从添加新的Room android架构库以来,这种情况已经开始发生。我在AppDatabase_Impl没有过期时遇到问题,我通过在注释中添加kapt来修复它: < li>Android Room持久性库和Kotlin < li >在Kotlin中实现房间持久性库 < in Kotlin中的房间持久性库实现(Gradle错误) 我怀疑其他错误是由于AS、Kotlin和Java 8造成的,所

  • 我花了5个多小时试图解决这个问题。有什么问题吗? 我补充道: 组织。格拉德尔。configureondemand=true 但问题依然存在 建筑gradle(模块:应用程序) 建造。gradle(项目:myproject) //顶级构建文件,您可以在其中添加所有子项目/模块通用的配置选项。

  • 我在intellij工作,使用spring boot和gradle。 我有一个使用其他两个库的服务,我似乎遇到了一些传递依赖性问题。 该服务中的spring boot版本已从2.1.2更新到2.1.4,这使得该服务不再工作。 我能够在gradle中干净地构建这个项目,但是一旦我启动它,在运行时我得到…无法解决org.slf4j: slf4j-api: 1.7.26。 我按照要求手动将库中的 slf

  • 我有一个JavaExec任务,它运行一个Java类来生成文件。源代码生成器需要搜索CLASSPATH来找到它用来确定要生成什么的某些类。它需要当前项目的类在CLASSPATH中。 我有这个任务: 当我运行此任务时,我得到“以下任务之间的循环依赖关系:”。很明显,我指的是类路径本身。 如果使用此任务,则项目类不在类路径中: 在这个问题上,我已经兜了几个小时的圈子,真的需要一些帮助。 提前谢谢!

  • 问题内容: 我正在使用Airflow计划批处理作业。我有一个DAG(A)每晚运行,另一个DAG(B)每月运行一次。B取决于A已成功完成。但是B需要很长时间才能运行,因此我想将其保存在单独的DAG中,以实现更好的SLA报告。 如何使运行DAG B依赖于同一天DAG A的成功运行? 问题答案: 您可以使用名为ExternalTask​​Sensor的运算符来实现此行为。将安排DAG(B)中的任务(B1

  • 问题内容: 最近,我接受了用JSF编写的Web系统的支持和编程。代码有点凌乱和多余,是的,​​不存在任何文档。 该系统具有40多个jar库,由于旧版本和测试,其中大多数是冗余的。要删除一个jar,我必须检查它是否未在代码中导入,因此我在代码中搜索了jar导入路径(我正在使用IntelliJ IDE),确保未使用它,然后将其删除。 但是,在编译代码后,在测试过程中发生了许多运行时错误。我发现我删除了