当前位置: 首页 > 知识库问答 >
问题:

带JdbcIO的Apache Beam流水线

吴和硕
2023-03-14

我有一个Apache Beam管道,它在读取BigQuery后试图写入Postgres。代码使用JdbcIO连接器和数据流运行器。我使用的是Python 3.8.7和Apache Beam 2.28.0

我使用的是默认扩展服务。我也尝试运行一个自定义扩展服务,但仍然得到相同的错误。你知道吗?

def export_to_postgres(user_options, pipeline_options, password):
    """Creates a pipeline that writes entities to postgres."""

    TeacherRow = NamedTuple(
        "TeacherRow",
        [
            ("teacher_id", str),
            ("first_name", str),
            ("last_name", str),
            ("total_all_publisher", int)
        ])

    coders.registry.register_coder(TeacherRow, coders.RowCoder)

    p = beam.Pipeline(options=pipeline_options)

    (p
     | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
                query=user_options.query_src,
                use_standard_sql=True
            )
     | beam.Map(lambda x:
                TeacherRow(teacher_id=str(x.teacher_id),
                              first_name=str(x.first_name),
                              last_name=str(x.last_name),
                              total_all_publisher=int(x.total_all_publisher)))
     .with_output_types(TeacherRow)
     | beam.WindowInto(beam.window.FixedWindows(10))
     .with_output_types(TeacherRow)
     | 'Write to jdbc' >> WriteToJdbc(
                table_name="teacher",
                driver_class_name='org.postgresql.Driver',
                jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "your ip address", "5432", "postgres"),
                username="postgres",
                password="password"
            )
     )
    p.run()

我得到以下错误

  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 102, in <module>
    run()
  File "/Users/Trex/workspace/workflow/dataflow/bq-to-pg.py", line 97, in run
    export_to_postgres(user_options, pipeline_options, password)
  File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 58, in export_to_postgres
    p.run()
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
    return Pipeline.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
    transform = ptransform.PTransform.from_runner_api(proto, context)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
    return constructor(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
    DoFnInfo.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
    raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1```

共有1个答案

丁曦
2023-03-14

这是https://issues.apache.org/jira/browse/beam-12043,希望在下一个版本中能有一个修正。

 类似资料:
  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我试图用JenkinsFile构建一个Jenkins声明性管道。Jenkinsfile将出现在项目的回购中。

  • 主要内容:实例,实例,实例,实例,实例,实例关键词:流水线,乘法器 硬件描述语言的一个突出优点就是指令执行的并行性。多条语句能够在相同时钟周期内并行处理多个信号数据。 但是当数据串行输入时,指令执行的并行性并不能体现出其优势。而且很多时候有些计算并不能在一个或两个时钟周期内执行完毕,如果每次输入的串行数据都需要等待上一次计算执行完毕后才能开启下一次的计算,那效率是相当低的。流水线就是解决多周期下串行数据计算效率低的问题。 流水线 流水线的基

  • 问题内容: 我必须用Java实现HTTP客户端,并且出于我的需要,似乎最有效的方法是实现HTTP管道(按照RFC2616)。 顺便说一句,我想管道POST。(我也不在谈论多路复用。我在谈论流水线,即在接收到任何HTTP请求的响应之前,通过一个连接发送许多请求) 我找不到明确声明其支持流水线的第三方库。但是我可以使用例如Apache HTTPCore 来构建这样的客户端,或者如果需要的话,可以自己构

  • 流水账单详细记录了云账号支出明细信息。 流水账单详细记录了云账号支出明细信息。一条流水账单详细记录了用户通过云账号在1天内一次性使用资源的时长和产生的费用信息,一条流水账单的资源使用时长最长为1天。1天内若多次使用指定资源,将产生多条流水账单记录。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “费用/账单/流水账单” 菜单项,进入流水账单页面。 查看流水账单 该功能用于查看流水账

  • 我不太会流口水和咕噜。 我有一个关于规则流的基本问题。 我在guvnor插件上使用引导编辑器创建了3条规则。现在我想根据第一条规则的结果调用第二条或第三条规则。 e、 g.如果患者年龄小于18岁,则进行第二条规则的小检查,否则请调用第三条规则由高级医生进行检查。 那么,这可以通过使用规则流来实现吗?如果是,如何?是否有任何示例链接和文档来演示它?非常感谢您的帮助。 谢啦