当前位置: 首页 > 知识库问答 >
问题:

如何访问par.do转换中的管道选项?

澹台正真
2023-03-14

所以,我认为管道选项会很好,我只是在模板编译时或运行时传递一个不同的参数,但我要花很长时间访问par.do转换我需要的地方。如果我使用默认运行器并在本地运行管道,它可以正常工作,但是当我切换并构建模板时,值总是null。我可以用下面的代码复制它:

/* 
imports...
*/

@SuppressWarnings("serial")
public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  static String orgId;

  public interface MyOptions extends PipelineOptions {


     @Description("Org Id")
     @Default.String("123-984-a")
     String getOrgId();
     void setOrgId( String orgID );

  }

  public static void main(String[] args) {


     PipelineOptionsFactory.register(MyOptions.class);


     final MyOptions options = PipelineOptionsFactory.fromArgs( args ).withValidation().create()
        .as( MyOptions.class );


     orgId = options.getOrgId();

     LOG.info( "orgId: " + orgId );

     Pipeline p = Pipeline.create( options );


     PCollection<String> someDataRows = p.apply("Get data from BQ", Create.of(

      "string 1", "string2", "string 3"

     ) );


     someDataRows.apply( "Package into a list", ParDo.of( new DoFn<String, String>() {

           @ProcessElement
           public void processElement( ProcessContext c ) {

              LOG.info( "Hello? " );
              LOG.info( "ORG ID: " + orgId );
           }

           }));


    p.run();
  }
}

云中的输出是:

 2018-09-20 (16:16:49) Hello?
 2018-09-20 (16:16:49) ORG ID: null
 2018-09-20 (16:16:51) Hello?
 2018-09-20 (16:16:51) ORG ID: null
 2018-09-20 (16:16:53) Hello?
 2018-09-20 (16:16:53) ORG ID: null
 ...

但在当地:

Sep 20, 2018 4:15:32 PM simplepipeline.StarterPipeline main
INFO: orgId: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
--project=the-project
--stagingLocation=gs://staging.the-project.appspot.com/staging/
--tempLocation=gs://staging.the-project.appspot.com/temp/
--runner=DataflowRunner
--region=us-west1
--templateLocation=gs://staging.the-project.appspot.com/templates/NoobPipelineDev
--orgId=jomama47

对于本地:

--project=the-project
--tempLocation=gs://staging.the-project.appspot.com
--orgId=jomama47

当我在Dataflow控制台(浏览器)中的参数字段中创建作业时,我尝试将参数传递给作业orgidjomama77,但仍然显示为null。

很抱歉帖子太长了。

共有1个答案

柯栋
2023-03-14

这里有两件事。首先,我建议使用valueProvider,以便在运行时为不同的orgid传递参数:

public interface MyOptions extends PipelineOptions {    
     @Description("Org Id")
     @Default.String("123-984-a")
     ValueProvider<String> getOrgId();
     void setOrgId(ValueProvider<String> orgID);   
}

然后从选项中阅读:

ValueProvider<String> orgId = options.getOrgId();

为了在ParDo中访问它,可以将其作为参数传递给构造函数,如文档中的示例:

someDataRows.apply( "Package into a list", ParDo.of( new CustomFn(orgId)));

其中customfn的构造函数将其作为参数,并将其存储在valueProvider中,以便可以从PARDO中访问。注意,现在需要使用orgid.get():

static class CustomFn extends DoFn<String, String> {
    // access options from wihtin the ParDo
    ValueProvider<String> orgId;
    public CustomFn(ValueProvider<String> orgId) {
        this.orgId = orgId;
    }

    @ProcessElement
    public void processElement( ProcessContext c ) {
      LOG.info( "Hello? " );
      LOG.info( "ORG ID: " + orgId.get() );
    }
}

现在您可以设置模板,并用以下命令调用它:

gcloud dataflow jobs run $JOB_NAME \
    --gcs-location gs://$BUCKET/templates/$TEMPLATE_NAME \
    --parameters orgId=jomama47

这应该按预期工作:

 类似资料:
  • 问题内容: 如何从项目管道访问settings.py中的scrapy设置。文档中提到可以通过扩展程序中的搜寻器访问它,但是我看不到如何在管道中访问搜寻器。 问题答案: 从内部访问Scrapy设置(如中所定义)的方法很简单。所有其他答案都太复杂了。原因是对Scrapy文档的维护非常差,加上许多最新的更新和更改。在“设置”文档“ 如何访问设置 ”中,或者在“设置API”中,都没有给出任何可行的示例。这

  • 我有一个Jenkins管道工作,它被配置为检查一个git回购和一个特定的本地分支。 两个值都为“null”

  • 我想在groovy类中访问“bat”或“echo”之类的管道步骤,但我并不真正了解“script”变量的用法。这是一个只有这个类的非常小的存储库,我在groovy测试中调用这个类(spock测试)。有人能告诉我“script”变量的正确用法以及调用应该是什么样子吗? 软件安装。groovy公司 } ps-我有一个Jenkinsfile,但它只包含一个gradle调用来运行测试。

  • 我在Azure DevOps的发布定义中为我的应用程序运行UI测试。我生成测试报告。我决定将它保存在build目录中比较方便(错误的假设?)。报告所在的目录为:

  • 问题内容: 我正在将Kendo下拉列表从现有代码转换为Kendo multiselect。 角色代码:当前为Dropdownlist(转换为Kendo multiselect)。 我没有得到正确的输出。 我有以下代码: 下面是获取角色代码的控制器代码: 如您所见,我尝试在上面的代码中使用多选功能。但这没有用。 问题答案: 下面的代码为我工作:

  • 但我不知道如何使用构建管道中的脚本步骤访问服务连接。例如,假设我有一个服务连接,表示Azure服务主体的凭据。我想在脚本步骤中访问这些凭据。 如何编写利用它们的脚本步骤?