我正在尝试建立一个Apache Beam管道,该管道使用Apache Beam读取Kafka并写入BigQuery。我使用这里的逻辑筛选出一些坐标:https://www.talend.com/blog/2018/08/07/development-data-processing-job-using-apache-beam-streaming-pipeline/TLDR:主题中的消息的格式为id,x,y。筛选出x>100或y>100的所有消息
我读取数据,进行几次转换,然后定义我的表模式,然后尝试写入BigQuery。我不确定如何调用write方法。可能是缺乏Java仿制知识。我相信它应该是一个PCollection,但不能安静地弄清楚它。
下面是管道代码-如果它被认为是代码转储,我只想给出整个上下文:
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.apply(
ParDo.of(
new DoFn<KafkaRecord<Long, String>, String>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<Long, String> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}))
.apply(
"FilterValidCoords",
Filter.by(new FilterObjectsByCoordinates(options.getCoordX(), options.getCoordY())))
.apply(
"ExtractPayload",
ParDo.of(
new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("filtered", c.element()));
}
}));
TableSchema tableSchema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("x_cord")
.setType("STRING")
.setMode("NULLABLE"),
new TableFieldSchema()
.setName("y_cord")
.setType("STRING")
.setMode("NULLABLE")
));
pipeline
.apply(
"Write data to BQ",
BigQueryIO
.<String, KV<String, String>>write() //I'm not sure how to call this method
.optimizedWrites()
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withMethod(FILE_LOADS)
.to(new TableReference()
.setProjectId("prod-analytics-264419")
.setDatasetId("publsher")
.setTableId("beam_load_test"))
);
你想要这样的东西:
[..]
pipeline.apply(BigQueryIO.writeTableRows()
.to(String.format("%s.dataset.table", options.getProject()))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchema(getTableSchema()));
我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。
我想写一些代码来处理从web下载时的错误。 这两条语句运行成功。下面,我创建了一个不存在的网址: 不存在。如何编写循环(函数)以便: 当URL错误时,输出为:“web URL错误,无法获取” 当URL错误时,代码不会停止,而是继续下载,直到URL列表结束
我必须得到特定的数据,如"角色"表已提交状态,我需要状态=1,所有数据从角色表
我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实
问题内容: 我在一个免费的支持PHP的服务器上安装了此脚本: 它创建文件,但为空。 如何创建文件并向其中写入内容,例如“猫追老鼠”行? 问题答案: 您可以使用更高级别的函数,例如,与调用,相同,然后依次将数据写入文件。
我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/