当前位置: 首页 > 知识库问答 >
问题:

Google数据流在BigQuery中写入多行

慕晨
2023-03-14

我有一个简单的流程,目的是在一个BigQuery表中写两行。我使用动态目标,因为之后我将在多个表上写,在那个例子中是同一个表...问题是我的BigQuery表最后只有一行。在第二次插入时,我看到以下错误

"状态:{code: 6
消息:"已存在:作业sampleProject et3:b9912b9b05794aec8f4292b2ae493612_eeb0082ade6f4a58a14753d1cc92ddbc_00001-0"
}"

这是什么意思?和这个限制有关吗?https://github . com/GoogleCloudPlatform/DataflowJavaSDK/issues/550我该如何做这项工作?

我用的是BeamSDK 2.0.0,我试过2.1.0(结果相同)

我发射的方式:

mvn compile exec:java-Dexec.mainClass=fr.gireve.dataflow。LogsFlowBug-Dexec。args=“--runner=DataflowRunner--inputDir=gs://sampleprojet3.appspot.com/--project=sampleprojet3--stagingLocation=gs://dataflow-sampleprojet3/tmp”-Pdataflow runner

    Pipeline p = Pipeline.create(options);

    final List<String> tableNameTableValue = Arrays.asList("table1:value1", "table1:value2", "table2:value1", "table2:value2");

    p.apply(Create.of(tableNameTableValue)).setCoder(StringUtf8Coder.of())
            .apply(BigQueryIO.<String>write()
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .to(new DynamicDestinations<String, KV<String, String>>() {
                @Override
                public KV<String, String> getDestination(ValueInSingleWindow<String> element) {
                    final String[] split = element.getValue().split(":");
                    return KV.of(split[0], split[1]) ;
                }

                @Override
                public Coder<KV<String, String>> getDestinationCoder() {
                    return KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
                }

                @Override
                public TableDestination getTable(KV<String, String> row) {
                    String tableName = row.getKey();
                    String tableSpec = "sampleprojet3:testLoadJSON." + tableName;
                    return new TableDestination(tableSpec, "Table " + tableName);
                }

                @Override
                public TableSchema getSchema(KV<String, String> row) {
                    List<TableFieldSchema> fields = new ArrayList<>();
                    fields.add(new TableFieldSchema().setName("myColumn").setType("STRING"));
                    TableSchema ts = new TableSchema();
                    ts.setFields(fields);
                    return ts;
                }
            })
            .withFormatFunction(new SerializableFunction<String, TableRow>() {
                public TableRow apply(String row) {
                    TableRow tr = new TableRow();
                    tr.set("myColumn", row);
                    return tr;
                }
            }));

    p.run().waitUntilFinish();

谢谢

共有1个答案

经福
2023-03-14

DynamicDestments将每个元素与一个目标相关联-即元素应该去的地方。元素根据其目的地路由到BigQuery表:1个目的地=1个带有架构的BigQuery表:目的地应该包含足够的信息来生成TableDestation和架构。具有相同目的地的元素转到相同的表,具有不同目的地的元素转到不同的表。

您的代码片段使用动态目的地,其目标类型包含元素和表,这是不必要的,当然,违反了上述约束:具有不同目标的元素最终转到同一个表:例如,KV(“table1”,“value1”)KV(“table1”,“value2”)是不同的目标,但您的getTable将它们映射到同一个表表1

您需要从目标类型中删除该元素。这也将导致更简单的代码。作为旁注,我认为你不需要重写 getDestinationCoder() - 它可以自动推断出来。

试试这个:

        .to(new DynamicDestinations<String, String>() {
            @Override
            public String getDestination(ValueInSingleWindow<String> element) {
                return element.getValue().split(":")[0];
            }

            @Override
            public TableDestination getTable(String tableName) {
                return new TableDestination(
                    "sampleprojet3:testLoadJSON." + tableName, "Table " + tableName);
            }

            @Override
            public TableSchema getSchema(String tableName) {
                List<TableFieldSchema> fields = Arrays.asList(
                    new TableFieldSchema().setName("myColumn").setType("STRING"));
                return new TableSchema().setFields(fields);
            }
        })
 类似资料:
  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 我们有一个托管在Google Kubernetes引擎上的NodeJS API,我们想开始将事件记录到BigQuery中。 我可以看到三种不同的方法: 使用API中的节点BigQuery SDK将每个事件直接插入BigQuery(如此处“流式插入示例”下所述):https://cloud.google.com/bigquery/streaming-data-into-bigquery或此处:htt

  • 我试图用Cloud Dataflow(Beam Python SDK)将它读写到BigQuery。 读写2000万条记录(约80 MBs)几乎需要30分钟。 查看dataflow DAG,我可以看到将每个CSV行转换为BQ行花费了大部分时间。

  • 分析输入后,有两个选项可用: 如果x=1->插入 如果x=2->更新 测试 null

  • 我希望从ParDo函数中调用操作,为中的每个键生成单独的BigQuery表(我使用的是python SDK)。这里有两个类似的线程,不幸的是没有帮助: 1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey 当我执行以下代码时,第一个键的行被插入到BigQuery,然后管道失败,出现以下错误

  • 有一个在Dataflow中使用过DynamicDestination的人,他有一个简单的描述示例。在git(https://github.com/googleCloudPlatform/dataflowTemplates/blob/master/src/main/Java/com/google/cloud/teleport/templates/dlpTextToBigQueryStreaming.