我可以以大约每秒10,000次插入的速度将插入直接流式传输到BigQuery,但是当我试图使用Dataflow插入时,'tobqrow'步骤(如下所示)非常慢。每10分钟只有50排,这是4名工人。知道为什么吗?以下是相关代码:
PCollection<Status> statuses = p
.apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
.apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
@ProcessElement
public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
String rowJson = c.element();
try {
TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
Status status = TwitterObjectFactory.createStatus(rowJson);
if (status == null) {
TweetsWriter.LOGGER.error("Status is null");
} else {
TweetsWriter.LOGGER.debug("Status value: " + status.getText());
}
c.output(status);
TweetsWriter.LOGGER.debug("Status: " + status.getId());
} catch (Exception var4) {
TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
}
}
}));
statuses
.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", (Object)null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
.withSchema(schema)
.withMethod(Method.STREAMING_INSERTS)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
p.run();
原来Dataflow下的Bigquery并不慢。问题是,'status.getPlace().getCountryCode()'返回NULL,所以它抛出的NullPointerException,我在日志中看不到!显然,数据流日志记录需要改进。它现在运行得很好。只要消息进入主题,几乎立刻就会被写入BigQuery!
我有一个数据流作业要写入BigQuery。它对于非嵌套模式很好,但是对于嵌套模式却失败了。 下面是我的数据流管道: 我使用以下模式创建了BigQuery表: 我得到以下错误: 有人能给我指路吗?我做错了什么?此外,如果有更好的方法迭代所有嵌套模式并写入BigQuery,请建议? 其他信息我的数据文件:
我想创建一个表,然后使用云函数写入bigquery,但是我不想复制表中的数据,所以我先删除表,然后在每次调用函数时创建表。 所以错误是当我首先删除表时,当它被重新创建以写入时,插入所有无法找到表我得到了这个错误:表abc.abc_names找不到
下面是我用来测试的脚本,在我所知道的情况下(主要来自视频),以及我在我的机器上得到的结果(python 3.8.10,numpy 1.19.5): 更新脚本:
我正在阅读beam文档和一些stackoverflow问题/答案,以便了解如何向BigQuery编写pubsub消息。到目前为止,我已经有了获取消息并能够消息的工作示例。代码如下所示
我正在尝试建立一个Apache Beam管道,该管道使用Apache Beam读取Kafka并写入BigQuery。我使用这里的逻辑筛选出一些坐标:https://www.talend.com/blog/2018/08/07/development-data-processing-job-using-apache-beam-streaming-pipeline/TLDR:主题中的消息的格式为id,
我有一个包含大约 5 亿条记录的 cassandra 表(在 6 个节点中),现在我正在尝试在 Amazon EMR 中使用 spark-cassandra-connector 插入数据 表结构 以下是我的火花提交选项 但是在日志中,我看到写入 Cassandra 大约需要 4-5 分钟才能加载 200,000 条记录(而总执行时间为 6 分钟) 我还在Spark conf中添加了以下内容 但仍然