当前位置: 首页 > 知识库问答 >
问题:

为Flink集群中的插件添加自定义依赖项

赖渊
2023-03-14

我有一个Flink会话集群(作业管理器任务管理器),版本1.11.1,配置了log4j控制台。属性包括Kafka appender。此外,在作业管理器和任务管理器中,我都启用了flink-s3-fs-hadoop内置插件。

我已经将kafka客户端jar添加到flink/lib目录,这是容器运行所必需的。但在实例化S3插件(并初始化记录器)时,我仍然会遇到类下加载错误。

原因:org.apache.kafka.common.config.ConfigException:无效值org.apache.kafka.common.serialization.ByteArraySerializer配置key.serializer:类org.apache.kafka.common.serialization.无法找到ByteArraySerializer。

(底部的完整堆栈跟踪)

据我所知,插件有一个专用的动态类加载,它与系统类加载分离。因此,我在flink-conf.yaml文件中添加了以下配置:

classloader.parent-first-patterns.additional: org.apache.kafka
classloader.resolve-order: parent-first

但错误仍然出现。

在调试时,我没有看到额外的模式被添加到插件类加载器的“alloadFlink Packages”中。

我错过了什么?

java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3FileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.ExceptionInInitializerError
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
        at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:481) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:136) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:164) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:304) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.<clinit>(AbstractS3FileSystemFactory.java:88) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
[2020-12-06T09:15:45,892][Error] {} [o.a.f.c.f.FileSystem]: Failed to load a file system via services
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3AFileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3hadoop.S3FileSystemFactory
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more

共有1个答案

邹晟睿
2023-03-14

正如您所说,Flink插件是通过自己的类加载器加载的,并且与任何其他插件完全隔离。

如果我们深入研究源代码,那么在集群启动时会使用另一个键(遗憾的是,它没有在任何地方记录):

plugin.classloader.parent-first-patterns.additional

这允许您使用PluginClassLoader将外部jar添加到类路径中

声明用途:https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L156-L174

将以下内容添加到flink-conf.yaml.

plugin.classloader.parent-first-patterns.additional: org.apache.kafka

那应该可以了

 类似资料:
  • 我将JAR文件粘贴在Android Studio的文件夹中。 我在Android Studio中将此文件添加为Jar依赖项。 build.gradle文件包含JAR的引用编译文件('libs/my-custom-jar-1.0.0.jar')

  • null 0.0.1-快照 _remote.repositories Maven-metadata-local org-utility-0.0.1-snapshot.jar org-utility-0.0.1-snapshot.pom org-utility-0.0.1-snapshot-jar-with-dependencies.jar 因此,我能够将jar“retrieve org-utili

  • 如何编写自定义gradle插件来处理自定义存储库中自定义模块描述符中的依赖关系?gradle文档说了以下内容,但我还没有找到任何可以告诉我如何操作的内容。 即使您的项目使用的是自定义依赖关系管理系统或类似Eclipse的东西。类路径文件作为依赖关系管理的主数据,编写Gradle插件以在Gradle中使用此数据非常容易。 我一直在为ATG项目维护一个定制的常春藤解析器(源自此项目),但Gradle最

  • 我正在做hazelcast监控服务,我需要为每个集群成员添加客户名称,以了解哪个模块不在集群中。 我想要成员的常量名称,而不是 这能做到吗?

  • 我试图为其他gradle项目使用的自定义gradle插件构建一个jar。我使用java编写插件。我有一个问题,包括我的jar中的依赖项。如果我使用下面的代码构建jarbuild.gradle 在一个项目中应用插件时,我得到了一个针对guava类的NoClassDefFound异常。如果我在 它表示未找到Id为“my Plugin”的。如何在gradle插件jar中包含依赖项?

  • 问题内容: 在迁移到Play 1.2时,我有些失落。我们的应用程序中有一组自定义模块。在Play 1.1.1中,我们使用了以下结构: 并且application.conf引用了模块作为相对路径 (../ module1) 如何在Play 1.2中执行相同操作?我看到我应该使用 dependencies.yml 文件,但是在官方文档中找不到关于此主题的任何信息。 提前致谢 编辑: Google Gr