当前位置: 首页 > 知识库问答 >
问题:

数据流:在管道完成后访问值提供程序

国阳
2023-03-14

我试图在管道完成后进行滞后更新,由于日期版本控制,表在运行时传入。由于此代码是作为模板执行的,因此需要使用NestedValueProviders。

public interface DataQueryRunnerOptions extends DataflowPipelineOptions {

@Description("Table to read/write payload data.")
    @Default.String("test.payloadData")
    ValueProvider<String> getPayloadTable();

@Description("Table to read eligibility data from, and update with payloadData")
    @Default.String("test.dqr_test_eligibilities")
    ValueProvider<String> getEligibilityInputTable();

}
campaignIdToDataQueryMap.apply("RunDataQueries", ParDo.of(new RunDataQueries()))
      .apply("WritePayloadDataToTable", BigQueryIO.writeTableRows()
        .withSchema(getPayloadDataSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .to(options.getPayloadTable()));
pipeline.run().waitUntilFinish();

runFinalUpdate(options);
private static void runFinalUpdate(DataQueryRunner2Options options) {

    ValueProvider.NestedValueProvider eligTable = ValueProvider.NestedValueProvider.of(
      options.getEligibilityInputTable(),
      (SerializableFunction<String, String>) eligibilityInputTable -> options.getEligibilityInputTable().get()
    );

    ValueProvider.NestedValueProvider payloadTable = ValueProvider.NestedValueProvider.of(
      options.getPayloadTable(),
      (SerializableFunction<String, String>) payload -> options.getPayloadTable().get()
    );

    String finalUpdate = "UPDATE " + eligTable.get() + " elig SET elig.dataQueryPayload = (SELECT pd.dataQueryPayload FROM `"
      + payloadTable.get() + "` pd WHERE pd.numericId = elig.numericId and pd.campaignId = elig.campaignId)"
      + " WHERE elig.dataQueryPayload IS NULL";

    try {
      Utilities.runQuery(finalUpdate);
    } catch (InterruptedException e) {
      LOG.error("Final update failure: " + e.getMessage());
      e.printStackTrace();
    }
  }
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=eligibilityInputTable, default=test.dqr_test_eligibilities}

如何在管道运行之外访问此值?在管道完成后,是否有更好的方法来做“只做一次”的工作?

共有1个答案

支嘉祥
2023-03-14

ValueProvider接口允许管道接受运行时参数。为了访问这些值以用于报告/日志记录,您需要在Beam DAG中访问它们。一个潜在的解决方案是在管道中创建一个报告分支,该分支接受单个虚拟值,并在“处理”该虚拟值的DoFn中将选项导出到外部存储区。

Java(SDK 2.9.0):

public interface YourOptions extends PipelineOptions {
 @Description("Your option")
 @Default.String("Hello World!")
 ValueProvider<String> getStringValue();

 void setStringValue(ValueProvider<String>  value);
}

public static void main(String[] args) {

 // Create pipeline
 YourOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(YourOptions.class);

 Pipeline p = Pipeline.create(options);

 // Branch for pushing the Value Provider value

 p.apply(Create.of(1)).apply(ParDo.of(new DoFn<Integer, Integer>() {

   @ProcessElement public void process(ProcessContext c) {

     YourOptions ops = c.getPipelineOptions().as(YourOptions.class);
     // Do something like push to DB here....
     LOG.info("Option StringValue was {}" , ops.getStringValue());

   }
 }));

 // The main pipeline....
 p.apply(Create.of(1,2,3,4)).apply(Sum.integersGlobally());

 p.run();
}
 类似资料:
  • 我想在谷歌数据流上运行一个管道,该管道取决于另一个管道的输出。现在,我正在本地使用DirectRunner依次运行两条管道: 我的问题如下: DataflowRunner是否保证第二个仅在第一个管道完成后启动

  • 我试图在Beam管道完成后,在Google DataFlow上运行一个函数(或管道)。 目前,我已经构建了一个hack来运行该函数,方法是使用 ... func在哪里: 但是有更好的方法吗?

  • 我试图启动一个流数据流作业,其中包含n个管道。 基于配置的主题和每个主题对应的BQ表,我想在一个流作业内启动一个管道。 这里的运行时参数为bucket_name和config_json_path,用于所有与配置相关的数据集、BQ表、topics/Subscription和所有工作流选项。 这到底有没有可能?因为谷歌也提供了一对一的模板。不是很多对很多模板(例如三个主题-三个BQ表(三个数据流水线)

  • 问题 如何在web.py中提供XML访问? 如果需要为第三方应用收发数据,那么提供xml访问是很有必要的。 解法 根据要访问的xml文件(如response.xml)创建一个XML模板。如果XML中有变量,就使用相应的模板标签进行替换。下面是一个例子: $def with (code) <?xml version="1.0"?> <RequestNotification-Response> <St

  • 我有一个应用程序,存储256行数字,指示网格视图中每个256个单元格的背景颜色。目前所有的值都是-16449536,表示简单的黑色。 以下是我在Firebase数据库中的JSON结构 在onCreative方法中,我使用for循环为每个单元格读取256次: 并非所有网格都是黑色的。每次我重新打开应用程序以触发onCreate方法时,网格模式都会有所不同。我是否达到了firebase实时数据库的某些

  • 我使用beam SDK用python编写了一个Google数据流管道。有一些文档介绍了我如何在本地运行它,并设置runner标志以在数据流上运行它。 我现在正尝试将其自动部署到CI管道(bitbucket管道,但并不真正相关)。有关于如何“运行”管道的文档,但没有真正的“部署”管道。我测试过的命令如下: 这将运行作业,但因为它正在流式传输,所以永远不会返回。它还在内部管理打包并推送到存储桶。我知道