当前位置: 首页 > 知识库问答 >
问题:

apache beam如何创建支持模式的PCollection

唐珂
2023-03-14

我试图使用apache beam读取两个数据集,如果第二个数据集中存在与ID匹配的行,则更新第一个数据集。以下是梁管道:


class MapDataFrameRow(beam.DoFn):

    def process(self, element):
        key = element.id
        yield key, element


class MergeTuples(beam.DoFn):

    def process(self, element):
        key, data = element
        if len(data[1]) > 0:
            yield data[1][0]
        else:
            yield data[0][0]

def run(pipeline_options, state_file, gcs_input_file_path, gcs_output_path):
    with beam.Pipeline(options=pipeline_options) as pipeline:

        current_state_df = pipeline | 'Read current state CSV' >> read_csv(state_file)
        updates_df = pipeline | 'Read data updates CSV' >> read_csv(gcs_input_file_path)

        # Translate beam dataframe to pcollections
        #  MapDataFrameRow is a DoFn that map the ID as the tuple key.
        cs_pc = to_pcollection(current_state_df, include_indexes=False) | "Map #1 to KV" >> beam.ParDo(MapDataFrameRow())
        nu_pc = to_pcollection(updates_df, include_indexes=False) | "Map #2 to KV" >> beam.ParDo(MapDataFrameRow())

        merged = (
                (cs_pc, nu_pc)
                | 'group by key' >> beam.CoGroupByKey()
                | 'keep most recent' >> beam.ParDo(MergeTuples())
                # | 'To rows' >> beam.Map(lambda word: beam.Row(??))
                | beam.ParDo(print)
        )


        df = to_dataframe(merged)
        df.to_csv(gcs_output_path)

代码产生以下输出:

BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=143, area=130.0, .... )
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=144, area=130.0, ... )
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=145, area=130.0, ... )
...

遗憾的是,我无法将合并的pcollection映射到数据帧中,出现以下错误:

TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas

我已尝试使用“到行”

编辑:问题似乎是to_dataframe(合并)不喜欢使用由两个to_pcollections生成的两个不同的BeamSchema_c46d9a9f_xxxxBeamSchema_3369b0_xxxx。换句话说,我正在创建两个单独的p集合,然后我从一个源或另一个源中选择,然后尝试创建一个新的数据帧,但是因为它们看起来像是单独的模式,他不喜欢它。合并两个数据帧的方法是什么?

共有1个答案

阎博易
2023-03-14

首先,我看到您应用了变换光束。ParDo(打印)以生成合并的PCollection。这意味着您尝试转换的PCollection将包含此函数返回的对象。是否使用自定义打印功能在打印后返回原始对象?

如果上面没有解决问题,您必须确保Beam Python可以从PCollection表示的对象类型推断Schema。对于自定义类型,这可以通过将对象转换为NamedTuple来完成。有关更多详细信息,请参阅此处。

这应该允许to_dataframe正确发现PCollection包含的对象类型的模式。

 类似资料:
  • 我正在尝试创建一个DateTimeFormatter对象,其模式符合以下时间表达式:2016-07-22T00:00:00.000-05:00。我正在尝试使用带有上述输入字符串的DateTimeFormatter类创建DateTime对象。 我已经尝试了下面表达式的许多不同版本,但目前被困在时区片"-05:00",在那里我得到了我的jUnit测试用例的错误: 我使用的当前格式模式是: 我也尝试过:

  • 我有一个表,其列如下所示: 符号 区域 国家 位置 日期 计数 我创建了如下表: null 简单地说,我想要支持where子句中所有或任意数量列的表结构。 在卡桑德拉有可能做到这一点吗?

  • 嗨得到这个错误,而试图建立一个项目我导入android工作室 在ProductFlavor_装饰的{name=main,minSdkVersion=ApiVersionImpl{mApiLevel=14,mCodename='null'},targetSdkVersion=ApiVersionImpl{mApiLevel=19,mCodename='null'},renderscriptTarge

  • 我有一个抽象类“A”,它具有枚举类型“OutputType”的属性,还有一个抽象方法calculation(),它需要执行certin计算,并根据OutputType值以double[]**形式输出结果。 我也有一个类定义为D1,D2,D3...D20,从A派生,其中每个Di类实现A的计算()方法不同。 问题是并非所有的OutputType值(calculation()输出类型)都在每个Di中都受

  • 我需要编写一个嵌入的XML模式,即模式在与数据相同的XML中定义。 我正在试图理解如何正确地执行它,但到目前为止,我无法获得一个简单的示例来通过验证。以下是我尝试使用的一个简单示例XML with inline schema: (注意:XML结构(例如根/项)已经过时,因此我无法在数据元素上使用命名空间。) 但是当我通过w3运行XML时。org XML Schema Validator,XML验证

  • 问题内容: 例如,我有DBManager.java Singleton类,必须将其部署在集群环境中。这是一个基于Web的应用程序,具有以下部署策略 Apache负载平衡器-> Tomcat 6(群集中有3个服务器)。 我必须为3个tomcat实例维护一个DBManager实例。 我的代码是 我一直在寻找解决此问题的方法,并发现类似JGroups API的东西。可以使用JGroups实现吗?任何想法