当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Beam中用BigQuery IO写入BigQuery?

马绪
2023-03-14

我正在尝试建立一个Apache Beam管道,该管道使用Apache Beam读取Kafka并写入BigQuery。我使用这里的逻辑筛选出一些坐标:https://www.talend.com/blog/2018/08/07/development-data-processing-job-using-apache-beam-streaming-pipeline/TLDR:主题中的消息的格式为id,x,y。筛选出x>100或y>100的所有消息

我读取数据,进行几次转换,然后定义我的表模式,然后尝试写入BigQuery。我不确定如何调用write方法。可能是缺乏Java仿制知识。我相信它应该是一个PCollection,但不能安静地弄清楚它。

下面是管道代码-如果它被认为是代码转储,我只想给出整个上下文:

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
                KafkaIO.<Long, String>read()
                        .withBootstrapServers(options.getBootstrap())
                        .withTopic(options.getInputTopic())
                        .withKeyDeserializer(LongDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class))
        .apply(
                ParDo.of(
                        new DoFn<KafkaRecord<Long, String>, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext processContext) {
                                KafkaRecord<Long, String> record = processContext.element();
                                processContext.output(record.getKV().getValue());
                            }
                        }))
        .apply(
                "FilterValidCoords",
                Filter.by(new FilterObjectsByCoordinates(options.getCoordX(), options.getCoordY())))
        .apply(
                "ExtractPayload",
                ParDo.of(
                        new DoFn<String, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) throws Exception {
                                c.output(KV.of("filtered", c.element()));
                            }
                        }));

        TableSchema tableSchema =
        new TableSchema()
                .setFields(
                        ImmutableList.of(
                                new TableFieldSchema()
                                        .setName("x_cord")
                                        .setType("STRING")
                                        .setMode("NULLABLE"),
                        new TableFieldSchema()
                                .setName("y_cord")
                                .setType("STRING")
                                .setMode("NULLABLE")

                        ));
        pipeline
                .apply(
                "Write data to BQ",
                BigQueryIO
                        .<String, KV<String, String>>write() //I'm not sure how to call this method
                        .optimizedWrites()
                        .withSchema(tableSchema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
                        .withMethod(FILE_LOADS)
                        .to(new TableReference()
                                .setProjectId("prod-analytics-264419")
                                .setDatasetId("publsher")
                                .setTableId("beam_load_test"))
        );

共有1个答案

施振海
2023-03-14

你想要这样的东西:

[..] 
pipeline.apply(BigQueryIO.writeTableRows()
        .to(String.format("%s.dataset.table", options.getProject()))
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withWriteDisposition(WRITE_APPEND)
        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
        .withSchema(getTableSchema()));
 类似资料:
  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我想写一些代码来处理从web下载时的错误。 这两条语句运行成功。下面,我创建了一个不存在的网址: 不存在。如何编写循环(函数)以便: 当URL错误时,输出为:“web URL错误,无法获取” 当URL错误时,代码不会停止,而是继续下载,直到URL列表结束

  • 我必须得到特定的数据,如"角色"表已提交状态,我需要状态=1,所有数据从角色表

  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 问题内容: 我在一个免费的支持PHP的服务器上安装了此脚本: 它创建文件,但为空。 如何创建文件并向其中写入内容,例如“猫追老鼠”行? 问题答案: 您可以使用更高级别的函数,例如,与调用,相同,然后依次将数据写入文件。

  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/