当前位置: 首页 > 知识库问答 >
问题:

如何使用ApacheFlink读取HDFS中的拼花文件?

勾通
2023-03-14

我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?

共有1个答案

双俊人
2023-03-14

好啊我已经找到了一种通过ApacheFlink读取HDFS中拼花地板文件的方法。

>

  • 您应该在pom.xml中添加以下依赖项

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-avro</artifactId>
      <version>1.10.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.2.1</version>
    </dependency>
    

    创建一个avsc文件来定义模式。Exp:

        {"namespace": "com.flinklearn.models",
         "type": "record",
         "name": "AvroTamAlert",
         "fields": [
            {"name": "raw_data", "type": ["string","null"]}
         ]
        }
    

    运行"java-jar D:\avro-tools-1.8.2.jar编译模式alert.avsc."以生成Java类并将AvroTamAlert.java复制到您的项目中。

    使用AvroParquetInputFormat读取hdfs中的拼花文件:

    class Main {
        def startApp(): Unit ={
            val env = ExecutionEnvironment.getExecutionEnvironment
    
            val job = Job.getInstance()
    
            val dIf = new HadoopInputFormat[Void, AvroTamAlert](new AvroParquetInputFormat(), classOf[Void], classOf[AvroTamAlert], job)
            FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/testpath"))
    
            val dataset = env.createInput(dIf)
    
            println(dataset.count())
    
            env.execute("start hdfs parquet test")
        }
    }
    
    object Main {
        def main(args:Array[String]):Unit = {
            new Main().startApp()
        }
    }
    

  •  类似资料:
    • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

    • 则错误如下: AttributeError:“property”对象没有属性“parquet”

    • 我是大数据生态系统的新手,有点起步。 我读过几篇关于使用spark流媒体阅读Kafka主题的文章,但我想知道是否可以使用spark作业而不是流媒体阅读Kafka主题?如果是的话,你们能帮我指出一些可以让我开始学习的文章或代码片段吗。 问题的第二部分是以拼花格式向hdfs写信。一旦我读了Kafka的书,我想我会有一个rdd。将此rdd转换为数据帧,然后将数据帧写入拼花文件。这是正确的方法吗。 感谢您

    • 如何读取带有条件作为数据帧的分区镶木地板, 这工作得很好, 分区存在的时间为< code>day=1到day=30是否可能读取类似于< code>(day = 5到6)或< code>day=5,day=6的内容, 如果我输入< code>*,它会给出所有30天的数据,而且太大了。

    • 使用scala或pyspark读取hadoop中存储的拼花地板文件时,出现错误: 或 导致相同的错误。 错误消息非常清楚必须做什么:无法推断拼花的模式。必须手动指定。;。但是我在哪里可以指定它呢? Spark 2.1.1、Hadoop 2.5、数据帧是在pyspark的帮助下创建的。文件被划分为10个peace。

    • 我们正在寻找一种解决方案,以便创建一个外部配置单元表,根据parquet/avro模式从parquet文件中读取数据。 换句话说,如何从拼花/avro模式生成hive表? 谢谢:)