当前位置: 首页 > 知识库问答 >
问题:

DataFlow/Apache Beam-如何按顺序设计流水线操作?

袁何平
2023-03-14

作为DataFlow/Apache Beam的一部分,我希望先从一个源读取,然后再写入一个源,然后再从一个源读取,然后再按此顺序写入。我如何确保下面R->W->R->W的顺序?我相信下面的运行就像一个r->w的并行管道。我不确定是否使用PDone对象来实现这一点。

(在下面的示例中,考虑BIGQUERYVIEWB是一个由TestDataSet1.Table2和其他几个表组成的大查询视图)

//Read 1
PCollection<TableRow> tr = pipeline.apply(BigQueryIO.readTableRows().fromQuery("SELECT ID FROM `TESTDATASET1.BIGQUERYVIEWA`").usingStandardSql());
PCollection<TableRow> tr1= tr.apply(ParDo.of(new SomeFn()));
//Write 1
tr1.apply(BigQueryIO.writeTableRows().withoutValidation()
                    .withSchema(FormatRemindersFn.getSchema())
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .to("TESTDATASET1.TABLE2"));
//Read 2
PCollection<TableRow> tr2 = pipeline.apply(BigQueryIO.readTableRows().fromQuery("SELECT ID FROM `TESTDATASET1.BIGQUERYVIEWB`").usingStandardSql());
PCollection<TableRow> tr3= tr.apply(ParDo.of(new SomeFn()));
//Write 2
tr3.apply(BigQueryIO.writeTableRows().withoutValidation()
                    .withSchema(FormatRemindersFn.getSchema())
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .to("TESTDATASET1.TABLE3"));

共有1个答案

郭恩
2023-03-14

下面是一个简单的伪代码示例

PCollection<String> data = ...;

PCollection<Something> first = data.apply(ParDo.of(..))

data.apply(Wait.On(first)).apply(ParDo.of(..))

注意wait.on(..)需要PCollection作为信号,而不是PDONE。我相信BigQuery.write返回一个WriteResult,您可以从中提取失败插入的PCollection。

 类似资料:
  • 问题内容: 我正在使用rub redis宝石。想知道我是否例如: 这样的执行顺序得到保证吗? 问题答案: 当然可以保证顺序,否则流水线将毫无用处。您可以随时查看代码。例如,此测试明确假定命令是按顺序执行的:https : //github.com/redis/redis- rb/blob/master/test/pipelining_commands_test.rb#L32

  • 有人可以通过添加到我已经编码的内容来帮助我设计解决方案吗?或者向我指出一个已经存在的模式来解决这个问题?

  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 在本节中,我们将研究如何链接不同的估计器。 简单示例:估计器之前的特征提取和选择 特征提取:向量化器 对于某些类型的数据,例如文本数据,必须应用特征提取步骤将其转换为数值特征。 为了说明,我们加载我们之前使用的 SMS 垃圾邮件数据集。 import os with open(os.path.join("datasets", "smsspam", "SMSSpamCollection")) as

  • 我对指令流水线有些怀疑。 我有一个集会

  • 问题内容: 如果我有这样的表和数据: 我希望按照从小到大的Group总数对它进行排序,例如:A-2个记录,B-1个记录,C-3个记录,因此它将变为: 我试过了 但这只会为我返回一个结果。 有什么提示吗?谢谢。 问题答案: 您需要首先聚合数据,这可以使用GROUP BY子句完成: 关键字DESC允许您首先显示最高计数,默认情况下按ORDER BY升序排列,这将首先显示最低计数。