当前位置: 首页 > 知识库问答 >
问题:

无法创建支持架构的Pcollection以在Google数据流中运行

梁丘招
2023-03-14

出于某种原因,每次我尝试在我的管道(ApacheBeamPython)中创建schema ware PCollection时,使用Beam。选择()。Row()并尝试在数据流上运行它,我得到一个如下所示的致命错误,该错误导致工作进程在VM上死亡。奇怪的是,当我使用Directrunner在本地运行时,我没有遇到这个错误。

我想确保这不是我在自定义代码中犯的愚蠢错误。我尝试了谷歌的例子,比如下面的片段简化自:https://github.com/apache/beam/blob/034ccdf93a0c5dfe6629501b456105ac47047e44/sdks/python/apache_beam/examples/sql_taxi.py

import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

PROJECT_ID = <my_project_id>

def run(pipeline_args):
  pipeline_options = PipelineOptions(
      pipeline_args, project=PROJECT_ID, save_main_session=True, streaming=True)

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | beam.io.ReadFromPubSub(
            topic='projects/pubsub-public-data/topics/taxirides-realtime',
            timestamp_attribute="ts").with_output_types(bytes)
        | "Parse JSON payload" >> beam.Map(json.loads)
        # Use beam.Row to create a schema-aware PCollection
        # ** This gives me trouble everytime when run through Dataflow **
        | "Create beam Row" >> beam.Map(
            lambda x: beam.Row(
                ride_status=str(x['ride_status']),
                passenger_count=int(x['passenger_count']))))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  import argparse

  parser = argparse.ArgumentParser()
  _, pipeline_args = parser.parse_known_args()

  run(pipeline_args)

完全相同的问题。在Directrunner中本地运行时非常好;在数据流中运行时崩溃。我使用的是ApacheBeam2.31。0 python SDK

我为此伤了好几天脑筋。任何帮助或指导都将不胜感激!!

共有1个答案

林华皓
2023-03-14

多亏了@大ドア東, 我能够使用NamedTuple让它在Google数据流上工作。

然而,我仍然对那束光束感到震惊。行()或梁。Select()在Apache Beam python SDK的数据流上根本不起作用,我在stackoverflow上找不到类似的抱怨。

 类似资料:
  • 问题内容: 我无法启动启动时自动启动数据库架构的Spring Boot。 这是我的application.properties: 这是我的Application.java: 这是一个示例实体: 有什么想法我做错了吗? 问题答案: 有几种可能的原因: 您的实体类位于与之相对的同一个子包中,或者在一个子包中,如果没有,则您的spring应用不会看到它们,因此不会在db中创建任何内容 检查您的配置,似乎

  • 我试图使用apache beam读取两个数据集,如果第二个数据集中存在与ID匹配的行,则更新第一个数据集。以下是梁管道: 代码产生以下输出: 遗憾的是,我无法将合并的pcollection映射到数据帧中,出现以下错误: 我已尝试使用“到行” 编辑:问题似乎是不喜欢使用由两个s生成的两个不同的和。换句话说,我正在创建两个单独的p集合,然后我从一个源或另一个源中选择,然后尝试创建一个新的数据帧,但是因

  • 问题内容: MyBatis是否有任何功能可以像Hibernate一样从类模型创建SQL模式? 我在Google中寻找该信息,但只找到有关MyBatis Generator的信息(http://mybatis.github.io/generator/)。这个工具对于从SQL模式生成Java模型似乎很有用,这与我想要的相反。 问题答案: MyBatis可以创建数据库架构吗? 恐怕不是。为此,您需要一个

  • tl;dr Apache Beam管道步骤涉及构建docker图像;如何使用谷歌数据流运行这个管道?存在哪些替代方案? 我目前正在尝试使用谷歌的数据流服务和apache梁(python)迈出第一步。 简单的例子很简单,但当外部软件依赖性开始发挥作用时,事情就会让我感到困惑。似乎可以使用自定义docker容器来设置自己的环境[1][2]。虽然这对大多数依赖项来说都很好,但如果依赖项是docker本身

  • 有人可以帮助我解决这个问题,我与火花数据帧? 当我执行myFloatRDD时。toDF()我收到一个错误: 类型错误:无法推断类型的架构:类型“浮动” 我不明白为什么... 例子: 谢谢

  • 我使用spring-boot-starter-data-solr,并希望利用Spring Data Solr的schmea cration支持,如文档中所述: 每当刷新应用程序上下文时,自动架构填充都会检查您的域类型,并根据属性配置将新字段填充到索引中。这要求 solr 在无架构模式下运行。 但是,我无法实现这一目标。据我所知,Spring启动器不会在@EnableSolrRepositories