当前位置: 首页 > 知识库问答 >
问题:

创建DAG数据流(apache Beam)

李和裕
2023-03-14

我在Dataflow(Apache beam)上创建了一个管道,以便在Google BigQuery上读写数据,但是我在创建DAG时遇到了问题,就像我在Airflow上做的那样。

这是我的代码中的一个示例:

# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps 
                                    | 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )

我希望按顺序执行这些任务,而Dataflow是并行执行的

我如何让它们按顺序执行?

共有1个答案

燕野
2023-03-14

我假设您是从以下大查询中读取的:

count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))

我深入研究了apache_beam源代码,看起来它们的源转换忽略了输入pcollection,这就是为什么它们是并行设置的。

请参见def展开(self,pbegin)的最后一行::

class Read(ptransform.PTransform):
  """A transform that reads a PCollection."""

  def __init__(self, source):
    """Initializes a Read transform.

    Args:
      source: Data source to read from.
    """
    super(Read, self).__init__()
    self.source = source

  def expand(self, pbegin):
    from apache_beam.options.pipeline_options import DebugOptions
    from apache_beam.transforms import util

    assert isinstance(pbegin, pvalue.PBegin)
    self.pipeline = pbegin.pipeline

    debug_options = self.pipeline._options.view_as(DebugOptions)
    if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
      source = self.source

      def split_source(unused_impulse):
        total_size = source.estimate_size()
        if total_size:
          # 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
          chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
        else:
          chunk_size = 64 << 20  # 64mb
        return source.split(chunk_size)

      return (
          pbegin
          | core.Impulse()
          | 'Split' >> core.FlatMap(split_source)
          | util.Reshuffle()
          | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
              split.source.get_range_tracker(
                  split.start_position, split.stop_position))))
    else:
      # Treat Read itself as a primitive.
      return pvalue.PCollection(self.pipeline)

# ... other methods

为什么你需要它们依次发生?你好像在给一张桌子写信,然后从另一张桌子读东西?

如果您真的需要顺序地执行此操作,也许像这样将其子类化read就可以了

class SequentialRead(Read):
  def expand(self, pbegin):
      return pbegin
 类似资料:
  • 创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息

  • 我的狗看起来像这样 我的DAG正在执行一个jar文件。jar文件包含运行数据流作业的代码,该作业将数据从GCS写入BQ。jar本身执行成功。 当我尝试执行airflow作业时,我看到以下错误 我做了更多的挖掘,我可以看到气流 正如您可以看到jobs之后的最后一个参数是asia east,因此我觉得airflow job正在尝试使用我在默认参数中提供的区域来搜索数据流job的状态。不确定这是否是正在

  • 如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。 输入:1,2,3,4,5,6,7... 输出:NA,NA,1,2,3,4,5...

  • 我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的

  • 我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。

  • 使用fromElements函数创建数据流时出错 下面是探险- 原因:java.io.IOException:无法从源反序列化元素。如果您使用的是用户定义的序列化(值和可写类型),请检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.kryoSerializer@599fcdda在org.apache.flink.strea