我在Dataflow(Apache beam)上创建了一个管道,以便在Google BigQuery上读写数据,但是我在创建DAG时遇到了问题,就像我在Airflow上做的那样。
这是我的代码中的一个示例:
# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps
| 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )
我希望按顺序执行这些任务,而Dataflow是并行执行的
我如何让它们按顺序执行?
我假设您是从以下大查询中读取的:
count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))
我深入研究了apache_beam源代码,看起来它们的源转换忽略了输入pcollection,这就是为什么它们是并行设置的。
请参见def展开(self,pbegin)的最后一行:
:
class Read(ptransform.PTransform):
"""A transform that reads a PCollection."""
def __init__(self, source):
"""Initializes a Read transform.
Args:
source: Data source to read from.
"""
super(Read, self).__init__()
self.source = source
def expand(self, pbegin):
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.transforms import util
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
debug_options = self.pipeline._options.view_as(DebugOptions)
if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
source = self.source
def split_source(unused_impulse):
total_size = source.estimate_size()
if total_size:
# 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
else:
chunk_size = 64 << 20 # 64mb
return source.split(chunk_size)
return (
pbegin
| core.Impulse()
| 'Split' >> core.FlatMap(split_source)
| util.Reshuffle()
| 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
split.source.get_range_tracker(
split.start_position, split.stop_position))))
else:
# Treat Read itself as a primitive.
return pvalue.PCollection(self.pipeline)
# ... other methods
为什么你需要它们依次发生?你好像在给一张桌子写信,然后从另一张桌子读东西?
如果您真的需要顺序地执行此操作,也许像这样将其子类化read
就可以了
class SequentialRead(Read):
def expand(self, pbegin):
return pbegin
创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息
我的狗看起来像这样 我的DAG正在执行一个jar文件。jar文件包含运行数据流作业的代码,该作业将数据从GCS写入BQ。jar本身执行成功。 当我尝试执行airflow作业时,我看到以下错误 我做了更多的挖掘,我可以看到气流 正如您可以看到jobs之后的最后一个参数是asia east,因此我觉得airflow job正在尝试使用我在默认参数中提供的区域来搜索数据流job的状态。不确定这是否是正在
如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。 输入:1,2,3,4,5,6,7... 输出:NA,NA,1,2,3,4,5...
我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的
我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。
使用fromElements函数创建数据流时出错 下面是探险- 原因:java.io.IOException:无法从源反序列化元素。如果您使用的是用户定义的序列化(值和可写类型),请检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.kryoSerializer@599fcdda在org.apache.flink.strea