当前位置: 首页 > 知识库问答 >
问题:

Google Cloud数据流、TextIO和Kerberize HDFS

邴修远
2023-03-14

我正在尝试使用Dataflow运行器上的BeamJava2.22.0从kerberize HDFS读取TSV文件。我正在使用带有kerberos组件的Dataproc集群来提供kerberize HDFS。我得到的错误是:

Error message from worker: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]

我正在按如下方式配置管道(注意,我已经配置了java.security.krb5.realm/kdc,我认为这使得在worker上不需要krb5.conf。我的hdfstextioptions扩展了HadoopFileSystemOptions,这允许我用Hadoop配置初始化管道。

我从GCS位置获取一个(当前未加密的)keytab,并使用它来初始化UserGroupInformation

  public static void main(String[] args) throws IOException {
    System.setProperty("java.security.krb5.realm", "MY_REALM");
    System.setProperty("java.security.krb5.kdc", "my.kdc.hostname");

    HdfsTextIOOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(
            HdfsTextIOOptions.class);

    Storage storage = StorageOptions.getDefaultInstance().getService();
    URI uri = URI.create(options.getGcsKeytabPath());
    System.err.println(String
        .format("URI: %s, filesystem: %s, bucket: %s, filename: %s", uri.toString(),
            uri.getScheme(), uri.getAuthority(),
            uri.getPath()));
    Blob keytabBlob = storage.get(BlobId.of(uri.getAuthority(),
        uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath()));
    Path localKeytabPath = Paths.get("/tmp", uri.getPath());
    System.err.println(localKeytabPath);

    keytabBlob.downloadTo(localKeytabPath);

    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://namenode:8020");
    conf.set("hadoop.security.authentication", "kerberos");

    UserGroupInformation
        .loginUserFromKeytab(options.getUserPrincipal(), localKeytabPath.toString());
    UserGroupInformation.setConfiguration(conf);

    options.setHdfsConfiguration(ImmutableList.of(conf));

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from(options.getInputFile()))
    ...

我是否缺少一些基本的配置,以便从Beam on Dataflow正确访问kerberize HDFS?

谢谢

共有1个答案

冀翰翮
2023-03-14

看起来您正在构建时在管道中设置系统属性。您需要确保在管道执行期间也设置了这些属性。

一个简单的方法是编写您自己的Jvm初始化器来设置这些属性。工作人员将使用Java的ServiceLoader实例化您的Jvm初始化器。

 类似资料:
  • 我使用的是Spring Cloud Edgware和Spring Cloud DataFlow 1.2.3。 我在contentType和originalContentType上遇到了问题,尽管我有一个解决方案,但我不明白为什么需要它。 现在需要在datasink和另一个rabbit Sink之间引入一个桥梁。新的桥流很简单: 兔源兔库 其中rabbit-source从前面提到的DataLink中

  • 我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设

  • 严格的单向数据流是 Redux 架构的设计核心。 这意味着应用中所有的数据都遵循相同的生命周期,这样可以让应用变得更加可预测且容易理解。同时也鼓励做数据范式化,这样可以避免使用多个且独立的无法相互引用的重复数据。 如果这些理由还不足以令你信服,读一下 动机 和 Flux 案例,这里面有更加详细的单向数据流优势分析。虽然 Redux 不是严格意义上的 Flux,但它们有共同的设计思想。 Redux

  • 有时,您希望发送非常巨量的数据到客户端,远远超过您可以保存在内存中的量。 在您实时地产生这些数据时,如何才能直接把他发送给客户端,而不需要在文件 系统中中转呢? 答案是生成器和 Direct Response。 基本使用 下面是一个简单的视图函数,这一视图函数实时生成大量的 CSV 数据, 这一技巧使用了一个内部函数,这一函数使用生成器来生成数据,并且 稍后激发这个生成器函数时,把返回值传递给一个

  • 那里!我是Cloud-DataFlow的新手。 我使用DataflowPipelineRunner读取csv文件并将结果输出到BigQuery。当csv文件的大小很小(只有20条记录,小于1MB)时,它工作得很好,但当文件的大小变大(超过1000万条记录,约616.42 MB)时,它出现了OOM错误。 以下是错误消息: oder.decodeOutOfMemoryError:Java堆空间oder

  • 我正在尝试使用google cloud sql和云endpoint开发一个应用程序,从google cloud sql文档中我发现google loud sql的连接器不提供任何连接池机制,我尝试在线搜索以获得任何可能的教程或文档,这些教程或文档提供了池机制,但没有结果,google文档只是指出,您应该在finally块中关闭连接,而不需要任何连接池配置。我还遇到了BoneCp、TomcatDbC