我正在努力将hdfs集成到flink。
Scala二进制版本:2.12,
Flink(群集)版本:1.10.1
这里是HADOOP_CONF_DIR;
hdfs的配置在这里;
这个配置和HADOOP_CONF_DIR在任务管理器中也是一样的。
pom.xml;
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>
我试图从hdfs获取拼花地板文件的全部内容,我的示例代码就在那里;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
Types.MessageTypeBuilder builder = Types.buildMessage();
MessageType messageType = builder
.required(INT64).named("column1")
.required(BINARY).as(UTF8).named("column2")
.required(INT64).named("column3")
.required(INT64).named("column4")
.required(BINARY).as(UTF8).named("column5")
.required(BINARY).named("column6")
.named("AppendTest");
ParquetTableSource parquetTableSource = ParquetTableSource.builder()
.path("hdfs://hdfs:8020/historic_data/data.parquet")
.withConfiguration(hadoopConf)
.forParquetSchema(messageType)
.build();
tEnv.registerTableSource("datatable", parquetTableSource);
Table table = tEnv.sqlQuery("select * from datatable");
DataSet<Row> tempDataSet = tEnv.toDataSet(table, Row.class);
tempDataSet.print();
env.execute("Job name - short desc.");
错误就在这里;
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:271)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:807)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:256)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
... 27 more
这是一个奇怪的部分,正如您在lib文件夹下看到的Hadoop uber jar一样
这就是我如何服从这份工作;
docker exec-it作业管理器env HADOOP_CONF_DIR=/tmp/hadoopconf flink run-Cfile:///opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar-d/tmp/core-batch-1.0.0。罐子
我也尝试了用Flink UI推送工作,但结果是一样的。
任何帮助都将不胜感激。
谢谢
@Aykut,我不确定这会有多大帮助,但我最近一直在测试Flink 1.12,它是通过我在GitHub上的ambari Flink服务创建的自定义ambari服务交付给HDP3的。当我将Flink安装到安装了Thread/hdfs的基本ambari集群中时,所有依赖项都可以完美地工作。一旦纱线flink应用程序在纱线集群中运行,我的应用程序测试的其余部分就工作得很好。纱线上的参考燧石。
以下是我的Flink用户环境变量:
export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.4.0-315/hadoop/conf:/usr/hdp/3.1.4.0-315/hadoop/lib/*:/usr/hdp/3.1.4.0-315/hadoop/.//*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/./:/usr/hdp/3.1.4.0-315/hadoop-hdfs/lib/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/.//*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/lib/*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/.//*:/usr/hdp/3.1.4.0-315/hadoop-yarn/./:/usr/hdp/3.1.4.0-315/hadoop-yarn/lib/*:/usr/hdp/3.1.4.0-315/hadoop-yarn/.//*:/usr/hdp/3.1.4.0-315/tez/*:/usr/hdp/3.1.4.0-315/tez/lib/*:/usr/hdp/3.1.4.0-315/tez/conf:/usr/hdp/3.1.4.0-315/tez/conf_llap:/usr/hdp/3.1.4.0-315/tez/doc:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-2.8-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib:/usr/hdp/3.1.4.0-315/tez/man:/usr/hdp/3.1.4.0-315/tez/tez-api-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-common-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-dag-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-examples-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-history-parser-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-javadoc-tools-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-job-analyzer-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-mapreduce-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-protobuf-history-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-internals-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-library-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-tests-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/ui:/usr/hdp/3.1.4.0-315/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/gcs-connector-1.9.10.3.1.4.0-315-shaded.jar:/usr/hdp/3.1.4.0-315/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-aws-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-datalake-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-hdfs-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.4.0-315/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.4.0-315/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.4.0-315/tez/lib/tez.tar.gz;
这是我的执行命令:
/opt/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/bin/flink run javaProgram.jar
下面是示例java代码(通过Flink将s3文件摄取到hdfs):
package org.myorg.quickstart;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class S3FlinkIngest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("s3a://s3Bucket/input.xml");
dataStream.writeAsText("hdfs://hadoopHost:8020/user/flink/ingest.xml", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);;
streamExecutionEnvironment.execute("S3 Flink Ingest");
} }
Linux文件管理从用户的层面介绍了Linux管理文件的方式。Linux有一个树状结构来组织文件。树的顶端为根目录(/),节点为目录,而末端的叶子为包含数据的文件。当我们给出一个文件的完整路径时,我们从根目录出发,经过沿途各个目录,最终到达文件。 我们可以对文件进行许多操作,比如打开和读写。在Linux文件管理相关命令中,我们看到许多对文件进行操作的命令。它们大都基于对文件的打开和读写操作。比如c
1、文件系统层次结构 现代操作系统有多种文件系统类型,因此文件系统的层次结构也不尽相同。 文件系统为用户提供与文件及目录有关的调用,如新建、打开、读写、关闭、删除文件,建立、删除目录等。此层由若干程序模块组成,每一模块对应一条系统调用,用户发出系统调用时,控制即转入相应的模块。 文件目录系统的主要功能是管理文件目录,其任务有管理活跃文件目录表、管理读写状态信息表、管理用户进程的打开文件表、管理与组
本文向大家介绍Vue多系统切换实现方案,包括了Vue多系统切换实现方案的使用技巧和注意事项,需要的朋友参考一下 前言 公司分好几个后台模块,统一使用vue+elementUi框架开发,每一个后台模块都是单独团队开发的。并且几个系统整体的风格、布局一样的,包括左侧边栏,上方的面包屑等 用户在使用的时候,可能要切换别的系统就要在浏览器里,新打开窗口,再输入网址,回车。 总结来说,低效,所以现在想将几个
本文向大家介绍查看linux文件系统块大小的实现方法,包括了查看linux文件系统块大小的实现方法的使用技巧和注意事项,需要的朋友参考一下 在linux系统上,可以用命令tune2fs ,测试如下 上面Block size即为块大小。 在WINDOWS系统上,可以用命令fsutil来查看,测试如下: C:\Documents and Settings\ct2>fsutil --help --hel
我知道这被问了好几次,但看起来我尝试的一切都不奏效。 我也是新手,如果我犯了任何错误,我很抱歉。 我尝试使用Gradle和eclipse构建java jar。 Gradle构建如下所示: 但我总是以: 在DefaultDependencyHandler类型的对象上找不到参数[org.springframework.boot: spring-boot-starter-data-jpa]org.gra
问题内容: 我的旧EC2实例存在一些未知问题,因此无法再使用它了。因此,我试图从旧卷的快照创建一个新的EBS卷并将其装入新实例。这正是我所做的: 从旧快照的快照创建了一个新卷。 创建了一个新的EC2实例,并将卷附加为(或)。 SSH进入实例,并尝试使用以下方法挂载旧卷: 输出为: 我知道我应该将fileytem指定为,但是该卷包含许多重要数据,因此我无法使用对其进行格式化。如果我尝试(不格式化),