当前位置: 首页 > 知识库问答 >
问题:

Flink-没有用于scheme:hdfs的文件系统

周马鲁
2023-03-14

我目前正在开发一个Flink 1.4应用程序,它从Hadoop集群读取一个Avro文件。然而,在我的IDE上以本地模式运行它是非常好的。但当我将其提交给Jobmanager Flink时,它总是失败,并显示以下消息:

java.io.IOException: Error opening the Input Split hdfs://namenode/topics/CaseLocations/partition=0/CaseLocations+0+0000155791+0000255790.avro [0,16549587]: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:110)
at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:54)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:864)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'hdfs'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:102)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 2 more
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:99)
... 3 more

我使用官方的Flink Docker imageFlink:1.4.0-hadoop28-scala_2.11运行集群,该集群应该已经包含Hadoop发行版。

我还试图将依赖项添加到我的应用程序jar中,但这也没有帮助。以下是我的sbt依赖项:

val flinkVersion = "1.4.0"
val hadoopVersion = "2.8.1"
val providedDependencies = Seq(
    "org.apache.flink" %% "flink-clients" % flinkVersion,
    "org.apache.flink" %% "flink-scala" % flinkVersion,
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion
)
val compiledDependencies = Seq(
    "org.apache.flink" % "flink-hadoop-fs" % flinkVersion,
    "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion,
    "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
    "org.apache.flink" % "flink-avro" % flinkVersion,
    "org.apache.flink" %% "flink-table" % flinkVersion,
    "org.scalaj" %% "scalaj-http" % "2.2.1"
)

此外,文件系统类包含在myMETA-INF/services/org中。阿帕奇。hadoop。财政司司长。文件系统

我是不是漏了什么?官方留档帮不了我。

提前谢谢

共有1个答案

陶朝明
2023-03-14

首先,需要一个HDFS集群。

其次,您需要检查flink-shaded-hadoop-2-uber-xxx。xx。FLINK_HOME/lib下的罐子。

如果您计划将Apache Flink与Apache Hadoop一起使用(在Thread上运行Flink、连接到HDFS、连接到HBase或使用一些基于Hadoop的文件系统连接器),则选择捆绑匹配Hadoop版本的下载,下载与您的版本匹配的可选预捆绑Hadoop,并将其放在Flink的lib文件夹中,或者导出HADOOP_类路径。

 类似资料:
  • 问题内容: 我正在尝试使用hadoop 运行一个简单的程序,出现此错误 代码: 指向文件,并且配置对象正在打印- 问题答案: 这是插件破坏事情的典型情况。 为什么这发生在我们身上 不同的JAR()各自目录中包含一个不同的文件。该文件列出了要声明的文件系统实现的规范类名(这称为通过实现的服务提供者接口,请参见)。 当使用时,它将所有的JAR合并为一个,并且全部覆盖彼此。这些文件仅保留其中一个(添加的

  • 我有一个桶,里面有几个小的拼花文件,我想把它们合并成一个更大的文件。 要完成此任务,我想创建一个spark作业来消费并写入一个新文件。 知道怎么了吗?

  • 我是Apache Flink CEP的新手,我正在努力检测一个简单的事件缺失。 我试图检测的是,具有特定ID的CurrencyEvent类型的事件在一定时间内不会发生。我想每次在3000ms之后事件没有发生时检测没有这样的事件。 我的模式代码如下所示: 所以现在我的想法是使用超时函数来检测超时事件: 我的测试源使用事件时间戳和水印,如下所示: 我在用TimeCharacteristics.Even

  • 我们在Azure中运行HDInsight集群,但它不允许在集群创建时向上旋转边缘/网关节点。所以我创建这个边缘/网关节点的方法是安装 然后我复制了 但是当我运行时,我得到以下错误 这里是完整的堆栈https://gist.github.com/anonymous/ebb6c9d71865c9c8e125aadbbdd6a5bc 我不确定这里缺少了哪个包/罐子。

  • 我正在尝试建立hadoop和Azure存储之间的联系。我已经在这里提到的core-site.xml中添加了属性:Link,仍然得到错误 Scheme:wasb没有文件系统 感谢任何帮助!

  • 提纲mount.ceph monaddr1[,monaddr2,...]:/[subdir] dir [ -o options ] 描述 mount.ceph 是在 Linux 主机上挂载 Ceph 文件系统的简单助手。它只负责把监视器主机名解析为 IP 地址、从硬盘读取认证密钥,大多数实际工作由 Linux 内核客户端组件完成。事实上,无需认证的 Ceph 文件系统无需 mount.ceph 也