当前位置: 首页 > 知识库问答 >
问题:

Apache Beam窗口化和sharding BigQuery输出表

郭博涉
2023-03-14

我的用例很简单:从pub/sub订阅中读取事件日志,解析它们并保存到BigQuery中。由于预计事件的数量将显著增加,而且我使用的是无界数据源,所以我决定在BigQuery中配置sharding:根据事件数据的时间戳(在Beam文档中称为“事件时间”)将事件存储到每日表中。我的问题是,我是否需要在我的情况下配置窗口,或者我可以只保留默认配置,隐式地使用全局窗口?我之所以问这个问题,是因为我发现的大多数BigQuery sharding示例都假设使用窗口配置。但是在我的例子中,由于我没有使用任何分组操作作为groupbykeycombine,所以在没有任何窗口配置的情况下应该可以。或者有什么理由让我使用窗口,例如,它可能会影响BigQueryio的执行方式?

下面是我现在分叉的方式。

static class TableNamingFn implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

        TableReference reference = new TableReference();
        reference.setProjectId("test-project");
        reference.setDatasetId("event_log");

        DateTime timestamp = new DateTime(input.getValue().get("event_timestamp"), DateTimeZone.UTC);
        reference.setTableId("events_" + formatter.print(timestamp));
        return new TableDestination(reference, null);
    }
}

// And then
eventRows.apply("BigQueryWrite", BigQueryIO.writeTableRows()
        .to(new TableNamingFn())
        .withSchema(EventSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

共有1个答案

庄飞
2023-03-14

您是否考虑过使用日期分区表来代替,这看起来像是试图按日期对表进行分段。您可以使用分区装饰器更新设置表id的位置,如下所示:

reference.setTableId("events$" + formatter.print(timestamp));

本文介绍了如何将BigQuery的分区表与Apache Beam一起使用。特别是,这段代码可能是您想要使用的:https://gist.githubusercontent.com/AlexvanBoxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/media_bq_tableref_partition

 类似资料:
  • 现在,你已经在运用 MATLAB 的命令行了,输入命令语句,就会看到结果出现在命令窗口中。 这一节介绍如何: 控制输出值的格式 用MATLAB命令隐藏输出 在命令行中输入长的命令 编辑命令行

  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我有一份flink工作,需要在1小时内重复删除收到的记录。重复数据消除后,我需要收集所有这些重复数据消除的文档,并进行一些聚合,如计数,然后生成目标主题。 现在,由于我只需要收集那些重复数据消除的文档,所以可能不需要等待1小时。我如何避免仅为收集这些文档而设置1个小时的窗口,但一旦收集到这些文档,就继续进行聚合。 因此,资源会占用内存,检查点大小也在增加,这是我想要避免的。 水印策略: 如有任何建

  • 问题内容: 我想使用Firefox浏览器,使用RSelenium从网站下载文件。我正确地完成了所有操作(导航,选择正确的元素并写下我想要的内容);现在,我单击“下载”按钮,然后打开一个Firefox弹出窗口,并询问我是否要下载文件或“用…打开”。 不幸的是,由于隐私限制,我无法编写示例。 我的问题是:如何在需要时切换到弹出窗口/警报并单击“确定”? 我尝试了以下方法,但均未成功: 我也试过了 但是

  • Popup 是一种可以包含任何Html内容的弹出窗口,从App的主内容区域上弹出。 Popup 和其他所有的遮罩图层一样,是所谓的“临时视图”的一部分。 Popup 布局 Popup 布局相当简单. 你所需要做的就是将放到 body 里正确的位置上: <div class="modal modal-no-buttons"> ... <div class="popup"> An

  • 问题内容: 因此,我一直在用Qt为我的Python应用程序创建GUI。我现在遇到的情况是,按下按钮后,将执行适当的推迟操作,我们执行一些任务,然后需要打开一个单独的窗口,其中包含一两个东西。但是我似乎无法弄清楚如何创建这个新的单独窗口。谁能给我一个如何创建一个例子吗? 问题答案: 一个使您抓狂的常见错误是忘记将创建的弹出窗口的句柄存储在将保持活动状态的python变量中(例如,存储在主窗口的数据成