我正在编写一个增量加载管道,将数据从MySQL加载到BigQuery,并使用Google Cloud Datastore作为元数据存储库。
我当前的管道是这样写的:
PCollection<TableRow> tbRows =
pipeline.apply("Read from MySQL",
JdbcIO.<TableRow>read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.cj.jdbc.Driver", connectionConfig)
.withUsername(username)
.withPassword(password)
.withQuery(query).withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow())))
.setCoder(NullableCoder.of(TableRowJsonCoder.of()));
tbRows.apply("Write to BigQuery",
BigQueryIO.writeTableRows().withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).to(outputTable));
tbRows.apply("Getting timestamp column",
MapElements.into(TypeDescriptors.strings())
.via((final TableRow row) -> (String) row.get(fieldName)))
.setCoder(NullableCoder.of(StringUtf8Coder.of())).apply("Max", Max.globally())
.apply("Updating Datastore", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
DatastoreConnector.udpate(table, c.element());
}
}));
我面临的问题是,当BigQuery写入步骤失败时,数据存储仍在更新,是否有任何方法在更新数据存储之前等待BigQuery写完成?
谢谢!
目前,这不能在与BigQueryIO.writeTableRows()
的同一管道中完成,因为它会产生终端输出(PDone
)。不过我有一些建议。
我有一个S3存储桶,每天的文件都会被丢弃。AWS爬虫从该位置爬网数据。在我的glue作业运行的第一天,它将获取AWS crawler创建的表中的所有数据。例如,在第一天就有三个文件。(即file1.txt、file2.txt、file3.txt)和glue job在执行glue job的第一天处理这些文件。第二天,另两个文件到达S3位置。现在,在S3位置,这些是存在的文件。(即file1.txt、
问题内容: 在JDK 1.7中,该方法使用以下表达式来增加ArrayList.java的数组容量:因此,看来新容量几乎比旧容量增加了50%。 但是在很多书中都说容量翻了一番…所以书没有更新或我不太了解? 问题答案: 您的理解是正确的,newCapacity比oldCapacity大50% 在Java 6中,newCapacity计算为 这是Java之类的开源语言的魅力,您可以看到实现-如果它不符合
我试图从SSRS服务器读取报告,问题是我的内存流不能超过65536字节。 到目前为止,我已经尝试过使用内存流,但尚未成功设置其容量,然后再阅读报告本身 上面的MemoryStream必须在我读取文件之前增加它的容量。 我试过在我的应用程序中玩。配置,但我不知道从哪里开始设置内存流的字节容量
你能帮我理解一下为什么我不能增加静态变量吗?我面临以下问题:*错误LNK2001:未解析的外部符号“private:static unsigned int counter::m_curcounters”(?m_curcounters@counter@@0ia)*
问题内容: 我了解对此主题有很多疑问。但是我仍然有些困惑,不确定何时使用这些操作。我正在为参加考试而做的旧考试。其中一种方法返回可访问的残疾人可用教室的数量。我编写了counter方法,但不确定是应该先递增还是递后递增计数器。我对它如何与方法中的return语句混淆。我仍然不知道该方法将在下面返回什么值。其他问题未在方法中显示返回值,因此我对它的工作方式感到困惑。这是代码: 问题答案: 当您要在表
问题内容: 为什么这项工作 但是扩充会导致语法错误。 我期待有另一种方式: 问题答案: 您不能在多个目标上使用增强分配语句。 引用扩充作业文档: 除了在单个语句中分配给元组和多个目标外, 由扩展赋值语句完成的赋值与普通赋值的处理方式相同。类似地,除了可能 的就地 行为外,通过扩充分配执行的二进制操作与正常的二进制操作相同。 强调我的。 就地扩展分配从转换为(每个操作员都有相应的钩子),并且不支持将