当前位置: 首页 > 知识库问答 >
问题:

为什么Apache Beam中的CustomOptions不继承DataflowPipelineOptions默认属性?

华项明
2023-03-14

我是Apache Beam的新手,并尝试使用DirectRunner和DataflowRunner运行示例读写程序。在我的用例中,CLI参数很少,为了实现这一点,我创建了一个扩展PipelineOptions的接口“CustomOptions.java”。

使用DirectRunner,程序运行良好,但使用DataflowRunner,它说“接口CustomOptions缺少一个名为‘项目’的属性”。

pom。xml

<dependencies>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.2.0</version>
        <type>maven-plugin</type>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

定义选项。java(接口)

import org.apache.beam.sdk.options.PipelineOptions;

public interface CustomOptions extends PipelineOptions {

    String getInput();
    void setInput(String value);

    String getOutput();
    void setOutput(String value);
}

字数。Java语言

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class WordCount {

    public static void main(String args[]) {
        PipelineOptionsFactory.register(CustomOptions.class);
        CustomOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("Read", TextIO.read().from(options.getInput()))
                .apply("Write", TextIO.write().to(options.getOutput()));

        p.run();
    }
}

命令:

DirectRunner (Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath
DataflowRunner (Not Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath --runner=DataflowRunner --stagingLocation=gs://<tmp_path> --project=<projectId>

错误:

Exception in thread "main" java.lang.IllegalArgumentException: Class interface CustomOptions missing a property named 'project'.
    at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1625)
    at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:115)
    at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:298)
    at WordCount.main(WordCount.java:13)

我尝试的第二件事是用DataflowPipelineOptions而不是PipelineOptions扩展CustomOptions。使用此选项,我也会遇到一个错误:

Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme gs
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
    at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:215)
    at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:734)
    at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1069)
    at WordCount.main(WordCount.java:15)

第二次试用还提出了一个问题,即不能使用DirectRunner和DataflowRunner执行相同的代码。因为在第二种情况下,“projectId”是一个强制参数,不会在DirectRunner中指定。

共有1个答案

柴默
2023-03-14

经过几次尝试和错误,我认为我得到了正确的结果。我使用的java类与问题中提到的相同,即扩展CustomOptions。java和PipelineOptions。我做的唯一改变是在pom。xml。

现在我正在使用带有少量额外配置的maven阴影插件,而不是maven汇编插件。通过这些我实现了:1.相同的jar可以与DirectRunner或DataflowRunner一起使用。2.说明我想从命令行执行哪个主类。

上一个“pom”。xml':

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.2.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- bind to the packaging phase -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <!-- add Main-Class to manifest file -->
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.dh.WordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.2.0</version>
        <type>maven-plugin</type>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

新的“pom.xml”:

<build>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<dependencies>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

当我读到这个答案时,这就成为可能:Google Dataflow“没有为scheme gs找到文件系统

 类似资料:
  • 问题内容: 免责声明 :这是 不是 这个情况(而错误的声音是一样的):)从类型为java.util.Set和java.util.List的类继承了spliterator(默认无关这就是为什么: 考虑两个接口(在“ ” 包中) 我很清楚为什么我们不能声明这样的类: (!) 但是我不能理解有关 类型变量的 限制: 出现错误:。 为什么我不能定义这样的类型变量?在这种情况下,为什么要关心无关的默认值?什

  • 下面的代码引发: 线程“main”java.lang.ClassCastException中的异常:不能将类子级转换为类java.util.List(子级位于加载器“app”的未命名模块中;java.util.List位于加载器“bootstrap”的模块java.base中) 我真的不知道它为什么会这样做。我想我写的代码是正确的。 请帮助我理解这一点,以及如何解决这个问题。

  • 问题内容: 为什么以下代码不起作用(Python 2.5.2)? 我想创建一个类似的类,但具有不同的功能。显然我的函数永远不会被调用。而是调用原始文件并失败,因为它需要3个参数,而我传入了一个。 这里发生了什么?这是一个线索吗? 谢谢! 问题答案: 关于其他几个答案,这与用C本身实现的日期无关。该方法不做任何事情,因为它们是 不可变的 对象,因此构造函数()应该完成所有工作。您会看到相同的行为将i

  • 问题内容: 从字体的color属性继承边框颜色是否正常?我很惊讶地发现: 给我一个带红色边框的div。通常不指定颜色将默认为黑色。这个奇怪的继承是什么? 问题答案: 根据相关背景和边框模块规范的第4.1节,初始值为:border-colorcurrentColor 根据相关背景和边框模块规范的第4.1节,初始值为: CSS颜色模块-4.4。颜色关键字 CSS1和CSS2将属性的初始值定义为属性的值

  • 问题内容: 假设有以下几种类型: 在这种情况下,如果执行,将打印“ hi from foo”。为什么的实现优先?不继承自,因为如果只实现,则将调用实现?因此,仍然不编译代码是有意义的。另外,由于应该具有的实现,为什么我不能像这样重写它: 尝试这样做时会发生以下错误: 错误的类型限定符Bar在默认的超级调用方法中,sayHi()在Foo中被覆盖 问题答案: 在JLS 9.4.1中 几乎使用您的确切示

  • 本文向大家介绍Python中的pathlib.Path为什么不继承str详解,包括了Python中的pathlib.Path为什么不继承str详解的使用技巧和注意事项,需要的朋友参考一下 起步 既然所有路径都可以表示为字符串,为什么 pathlib.Path 不继承 str ? 这个想法的提出在 https://mail.python.org/pipermail//python-ideas/201