当前位置: 首页 > 知识库问答 >
问题:

增量加载和 BigQuery

柳高卓
2023-03-14

我正在编写一个增量加载管道,将数据从MySQL加载到BigQuery,并使用Google Cloud Datastore作为元数据存储库。

我当前的管道是这样写的:

PCollection<TableRow> tbRows = 
pipeline.apply("Read from MySQL",
        JdbcIO.<TableRow>read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                .create("com.mysql.cj.jdbc.Driver", connectionConfig)
                .withUsername(username)
                .withPassword(password)
                .withQuery(query).withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow())))
    .setCoder(NullableCoder.of(TableRowJsonCoder.of()));

tbRows.apply("Write to BigQuery",
            BigQueryIO.writeTableRows().withoutValidation()
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).to(outputTable));

tbRows.apply("Getting timestamp column",
                MapElements.into(TypeDescriptors.strings())
                        .via((final TableRow row) -> (String) row.get(fieldName)))
                .setCoder(NullableCoder.of(StringUtf8Coder.of())).apply("Max", Max.globally())
                .apply("Updating Datastore", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(final ProcessContext c) {
                        DatastoreConnector.udpate(table, c.element());
                    }
                }));

我面临的问题是,当BigQuery写入步骤失败时,数据存储仍在更新,是否有任何方法在更新数据存储之前等待BigQuery写完成?

谢谢!

共有1个答案

边永贞
2023-03-14

目前,这不能在与BigQueryIO.writeTableRows()的同一管道中完成,因为它会产生终端输出(PDone)。不过我有一些建议。

  • 我怀疑BigQuery写入失败是罕见的。在这种情况下,您可以从辅助作业/进程中删除相应的Datastore数据。
  • 您是否考虑过更适合写入增量更改数据的CDC解决方案。例如,请参阅此处的数据流模板。
 类似资料:
  • 我有一个S3存储桶,每天的文件都会被丢弃。AWS爬虫从该位置爬网数据。在我的glue作业运行的第一天,它将获取AWS crawler创建的表中的所有数据。例如,在第一天就有三个文件。(即file1.txt、file2.txt、file3.txt)和glue job在执行glue job的第一天处理这些文件。第二天,另两个文件到达S3位置。现在,在S3位置,这些是存在的文件。(即file1.txt、

  • 问题内容: 在JDK 1.7中,该方法使用以下表达式来增加ArrayList.java的数组容量:因此,看来新容量几乎比旧容量增加了50%。 但是在很多书中都说容量翻了一番…所以书没有更新或我不太了解? 问题答案: 您的理解是正确的,newCapacity比oldCapacity大50% 在Java 6中,newCapacity计算为 这是Java之类的开源语言的魅力,您可以看到实现-如果它不符合

  • 我试图从SSRS服务器读取报告,问题是我的内存流不能超过65536字节。 到目前为止,我已经尝试过使用内存流,但尚未成功设置其容量,然后再阅读报告本身 上面的MemoryStream必须在我读取文件之前增加它的容量。 我试过在我的应用程序中玩。配置,但我不知道从哪里开始设置内存流的字节容量

  • 你能帮我理解一下为什么我不能增加静态变量吗?我面临以下问题:*错误LNK2001:未解析的外部符号“private:static unsigned int counter::m_curcounters”(?m_curcounters@counter@@0ia)*

  • 问题内容: 我了解对此主题有很多疑问。但是我仍然有些困惑,不确定何时使用这些操作。我正在为参加考试而做的旧考试。其中一种方法返回可访问的残疾人可用教室的数量。我编写了counter方法,但不确定是应该先递增还是递后递增计数器。我对它如何与方法中的return语句混淆。我仍然不知道该方法将在下面返回什么值。其他问题未在方法中显示返回值,因此我对它的工作方式感到困惑。这是代码: 问题答案: 当您要在表

  • 问题内容: 为什么这项工作 但是扩充会导致语法错误。 我期待有另一种方式: 问题答案: 您不能在多个目标上使用增强分配语句。 引用扩充作业文档: 除了在单个语句中分配给元组和多个目标外, 由扩展赋值语句完成的赋值与普通赋值的处理方式相同。类似地,除了可能 的就地 行为外,通过扩充分配执行的二进制操作与正常的二进制操作相同。 强调我的。 就地扩展分配从转换为(每个操作员都有相应的钩子),并且不支持将