当前位置: 首页 > 知识库问答 >
问题:

Flink从hdfs读取数据

翟默
2023-03-14

我是Flink大学的一年级新生,我想知道如何从hdfs读取数据。有谁能给我一些建议或简单的例子吗?谢谢大家。

共有3个答案

汪兴旺
2023-03-14

在Centos7机器上使用Flink 1.13、Hadoop 3.1.2和Java 1.8.0,我可以从HDFS中读取数据HADOOP\u HOMEHADOOP\u CLASSPATH已经导出。我认为从1.11版开始,有些东西发生了变化。我甚至找不到一个简单的例子。因此,我举我的例子。

我添加到pom。xml以下依赖项

 <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.2</version>
 </dependency>

我的Scala代码:

package com.vbo.datastreamapi

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object ReadWriteHDFS extends App {
  
  val env = StreamExecutionEnvironment.getExecutionEnvironment


  val stream = env.readTextFile("hdfs://localhost:9000/user/train/datasets/Advertising.csv")

  stream.print()

  env.execute("Read Write HDFS")

}
左丘子平
2023-03-14

Flink可以读取HDFS数据,这些数据可以是文本、Json、avro等任何格式。对Hadoop输入/输出格式的支持是flink java maven模块的一部分,在编写flink作业时需要这些模块。

示例1:读取名为JsonSeries的文本文件并在控制台上打印

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> lines = env.readTextFile("hdfs://localhost:9000/user/hadoop/input/JsonSeries.txt")
        .name("HDFS File read");
lines.print();

示例2:使用输入格式

DataSet<Tuple2<LongWritable, Text>> inputHadoop =
        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
                LongWritable.class, Text.class, "hdfs://localhost:9000/user/hadoop/input/JsonSeries.txt"));
inputHadoop.print();
彭嘉赐
2023-03-14

如果文件格式为文本文件格式,则可以使用“ExecutionEnvironment”对象中的“readTextFile”方法。

下面是各种数据源的示例。(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#data-(来源)

 类似资料:
  • 我一直在试图找到一个连接器,将数据从Redis读取到Flink。Flink的文档中包含了要写入Redis的连接器的描述。在我的Flink工作中,我需要从Redis读取数据。在使用ApacheFlink进行数据流传输时,Fabian提到可以从Redis读取数据。可用于此目的的接头是什么?

  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 如何使用scala语言从hdfs数据集中读取数据?数据是任何记录有限的“CSV”文件。

  • 我必须使用Flink作为流引擎处理来自Kafka的数据流。为了对数据进行分析,我需要查询Cassandra中的一些表。做这件事最好的方法是什么?我一直在Scala中寻找这样的例子。但是我找不到任何数据。如何使用Scala作为编程语言在Flink中读取来自Cassandra的数据呢?使用apache flink Java API将数据读写到cassandra中也有同样的问题。答案中提到它有多种方法。

  • 问题内容: 这是我的问题:我在HDFS中有一个文件,该文件可能很大(=不足以容纳所有内存) 我想做的是避免必须将此文件缓存在内存中,而仅像逐行处理常规文件一样逐行处理它: 我正在寻找是否有一种简单的方法可以在不使用外部库的情况下正确完成此操作。我可能可以使它与libpyhdfs或python- hdfs一起使用, 但我想尽可能避免在系统中引入新的依赖项和未经测试的库,尤其是因为这两个似乎都没有得到

  • 我试图用以下链接中提供的信息将Cassandra作为Flink中的数据来源: null 异常跟踪-->