当前位置: 首页 > 知识库问答 >
问题:

为什么使用Dataflow写入Bigquery非常慢?

方季同
2023-03-14

我可以以大约每秒10,000次插入的速度将插入直接流式传输到BigQuery,但是当我试图使用Dataflow插入时,'tobqrow'步骤(如下所示)非常慢。每10分钟只有50排,这是4名工人。知道为什么吗?以下是相关代码

PCollection<Status> statuses = p
        .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
        .apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
    @ProcessElement
    public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
            String rowJson = c.element();

            try {
                TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                Status status = TwitterObjectFactory.createStatus(rowJson);
                if (status == null) {
                    TweetsWriter.LOGGER.error("Status is null");
                } else {
                    TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                }
                c.output(status);
                TweetsWriter.LOGGER.debug("Status: " + status.getId());
            } catch (Exception var4) {
                TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
            }

    }
}));

statuses
        .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = new TableRow();
                Status status = c.element();
                row.set("Id", status.getId());
                row.set("Text", status.getText());
                row.set("RetweetCount", status.getRetweetCount());
                row.set("FavoriteCount", status.getFavoriteCount());
                row.set("Language", status.getLang());
                row.set("ReceivedAt", (Object)null);
                row.set("UserId", status.getUser().getId());
                row.set("CountryCode", status.getPlace().getCountryCode());
                row.set("Country", status.getPlace().getCountry());
                c.output(row);
        }
    }))
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
                .withSchema(schema)
                .withMethod(Method.STREAMING_INSERTS)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

p.run();

共有1个答案

司徒隐水
2023-03-14

原来Dataflow下的Bigquery并不慢。问题是,'status.getPlace().getCountryCode()'返回NULL,所以它抛出的NullPointerException,我在日志中看不到!显然,数据流日志记录需要改进。它现在运行得很好。只要消息进入主题,几乎立刻就会被写入BigQuery!

 类似资料:
  • 我有一个数据流作业要写入BigQuery。它对于非嵌套模式很好,但是对于嵌套模式却失败了。 下面是我的数据流管道: 我使用以下模式创建了BigQuery表: 我得到以下错误: 有人能给我指路吗?我做错了什么?此外,如果有更好的方法迭代所有嵌套模式并写入BigQuery,请建议? 其他信息我的数据文件:

  • 我想创建一个表,然后使用云函数写入bigquery,但是我不想复制表中的数据,所以我先删除表,然后在每次调用函数时创建表。 所以错误是当我首先删除表时,当它被重新创建以写入时,插入所有无法找到表我得到了这个错误:表abc.abc_names找不到

  • 下面是我用来测试的脚本,在我所知道的情况下(主要来自视频),以及我在我的机器上得到的结果(python 3.8.10,numpy 1.19.5): 更新脚本:

  • 我正在阅读beam文档和一些stackoverflow问题/答案,以便了解如何向BigQuery编写pubsub消息。到目前为止,我已经有了获取消息并能够消息的工作示例。代码如下所示

  • 我正在尝试建立一个Apache Beam管道,该管道使用Apache Beam读取Kafka并写入BigQuery。我使用这里的逻辑筛选出一些坐标:https://www.talend.com/blog/2018/08/07/development-data-processing-job-using-apache-beam-streaming-pipeline/TLDR:主题中的消息的格式为id,

  • 我有一个包含大约 5 亿条记录的 cassandra 表(在 6 个节点中),现在我正在尝试在 Amazon EMR 中使用 spark-cassandra-connector 插入数据 表结构 以下是我的火花提交选项 但是在日志中,我看到写入 Cassandra 大约需要 4-5 分钟才能加载 200,000 条记录(而总执行时间为 6 分钟) 我还在Spark conf中添加了以下内容 但仍然