我使用apache beam和google dataflow Runner将数据从kafka流到BigQuery。我想利用insertId进行重复数据删除,我在google Docs中发现了这一点。但是,即使这些插入是在几秒钟内发生的,我仍然看到许多行具有相同的insertID。现在我在想,也许我没有正确地使用API来利用BQ提供的流式插入的重复数据删除机制。
我在beam中编写的代码如下所示:
payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
.withFormatFunction(ps -> FxTableRowConverter.convertFxPaymentToTableRow(ps))
.to(bqTradePaymentTable)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
除了所有其他字段之外,我将在FxTableRowConverter.ConvertFxPaymentTotableRow方法中直接在TableRow上设置insertId作为格式函数传递给BigQueryIO:
row.set("insertId", insertId);
我还将该字段作为列添加到BQ中。没有它,它在插入上失败了(很明显)。除了将insertId添加到TableRow对象之外,我找不到其他直接在BigQueryIO上设置insertId的方法。
这是正确的使用方法吗?因为它对我不起作用,我看到许多重复,即使我不应该,因为正如我已经提到的插入发生在几秒钟内。BigQuery文档声明流式缓冲区将insertId保持至少一分钟。
不能在数据流https://stackoverflow.com/A/54193825/1580227中为BigQuery流手动指定insertId
我有一个在MongoDB中存储数据脚本,我想用另一个脚本删除文档。我的数据库中存储的每个文档都采用以下格式: 我想做的是通过传递密钥(在本例中为k1526346000_500)来删除文档,当我尝试执行以下操作时: 编辑:当我转储该文档时,我会得到以下结果: 所以这里肯定不是用“_id”,但我不知道如何捕捉这个字段:(
我有一个spring boot应用程序(基于spring-boot-starter-data-jpa。 我使用了CrudRepository 和几个findBy方法,这些方法都可以工作。而且我有一个派生的deleteBy方法--这不起作用。签名很简单: 实体也很简单: deleteBy方法不起作用的原因是它似乎只向数据库发出一个“select”语句,该语句选择所有的MyEntity行,这些行具有一
我是JSTL新手,由于某种原因,我无法让测试线工作。以下是我使用的简化代码: 当我使用 ${hasChild} 它在屏幕上打印真实,但在测试线上不会发出嗡嗡声,我不知道为什么。有人能帮忙吗?
请注意,在转向您之前,我已经浏览了各种帖子。事实上,我尝试实现中提供的解决方案:基于“notin”条件从数据帧中删除行 我的问题如下。让我们假设我有一个巨大的数据帧,我想删除重复的数据帧。我很清楚我可以使用drop_duplicates,因为这是最快的最简单的方法。然而,我们的老师希望我们创建一个包含重复项ID的列表,然后根据这些值是否包含在上述列表中删除它们。 现在,让我们看看输出: 因此,我得
本文向大家介绍Oracle删除重复的数据,Oracle数据去重复,包括了Oracle删除重复的数据,Oracle数据去重复的使用技巧和注意事项,需要的朋友参考一下 Oracle 数据库中查询重复数据: select * from employee group by emp_name having count (*)>1; Oracle 查询可以删除的重复数据 select t1.* from
本文向大家介绍Mysql删除重复的数据 Mysql数据去重复,包括了Mysql删除重复的数据 Mysql数据去重复的使用技巧和注意事项,需要的朋友参考一下 MySQL数据库中查询重复数据 select * from employee group by emp_name having count (*)>1; Mysql 查询可以删除的重复数据 select t1.* from employee