当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka源时间戳提取器的类加载

卫彭亮
2023-03-14

我正在尝试将Flink作业部署到基于Flink:1.4.1-hadoop27-scala\u 2.11-alpine映像的集群中。作业使用的是Kafka连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与Flink Kafka连接器文档中的Scala示例非常相似。但FlinkKafkaConsumer011

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())

这在从IDE本地运行时非常有效。但是,在集群环境中,我遇到以下错误:

java.lang.ClassNotFoundException: com.my.organization.CustomWatermarkEmitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

我正在将我的工作构建为一个胖罐子,我已经验证它包含这个类。文档中的这个示例是否仅在CustomWatermarkEmitter类位于/opt/flink/lib/文件夹中时有效?

这是我解决问题的方法。但是必须单独构建这个类并将其放在 /opt/flink/lib中会使我的构建过程变得非常复杂,所以我想知道这是应该解决的方法还是有其他解决这个问题的方法?

例如,Flink文档中的这一部分暗示必须手动提供一些源UserCodeClassLoader?包括提供的Kafka来源?

据我在org中所见,它似乎在内部使用了“userCodeClassLoader”。阿帕奇。Flink。流式处理。连接器。Kafka。内部构件。AbstractFetcher:

            case PERIODIC_WATERMARKS: {
            for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());

                AssignerWithPeriodicWatermarks<T> assignerInstance =
                        watermarksPeriodic.deserializeValue(userCodeClassLoader);

                KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
                        new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                                partitionEntry.getKey(),
                                kafkaHandle,
                                assignerInstance);

                partitionState.setOffset(partitionEntry.getValue());

                partitionStates.add(partitionState);
            }

编辑:

我已经创建了一个简单的项目,这个问题可以在这里重现:https://github.com/lragnarsson/flink-kafka-classpath-problem

为了复制,您需要docker和docker compose。

只是做:

  1. git克隆https://github.com/lragnarsson/flink-kafka-classpath-problem.git
  2. cd flink-kafka-classpath-问题/docker
  3. docker-撰写构建
  4. docker-组成
  5. 在浏览器中localhost:8081
  6. 提交包含的jar文件从目标/scala-2.11/flink-kafka-classpath问题汇编-0.1-SNAPSHOT. jar

这应该会导致异常java.lang.ClassNotFoundException:se.ragnarsson.lage.MyTimestampExtractor

共有1个答案

山越
2023-03-14

我想您偶然发现了Flink 1.4.1中引入的一个bug:https://issues.apache.org/jira/browse/FLINK-8741.

它将很快在1.4.2中修复。您可以尝试在1.4.2.rc2:https://github.com/apache/flink/tree/release-1.4.2-rc2

 类似资料:
  • 我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展类并将其传递到中来解决这个问题。 我是这样做的- 我基于github上的这段代码 但是,当我运行

  • 这个服务我已经测试了它,使用不同版本的Kafka(更高或等于0.10),它工作良好。 以下是我的配置: Spring:cloud:stream:kafka:streams:binder:brokers:${KAFKA_BROKERS}applicationid:email-MESSAGES-stream configuration:default.key.serde:org.apache.kafk

  • 问题内容: 是否有可能为本地运行的Java应用程序和作为applet和/或JNLP Webapps可靠地确定给定类的编译时间戳? 问题答案: 根据Java虚拟机规范,类文件格式不需要任何类型的时间戳,因此,您最好的办法是检查包含该类的类或Jar文件的修改时间。不幸的是,文件系统操作(尤其是在各种主机之间)可能无法保留此类时间戳。 我会说默认情况下没有可靠的方法。但是,您可以在构建过程中轻松地将这样

  • 我使用这个库在我的Android应用程序中使用FFmpeg。我正在从视频中提取帧,然后将它们添加到裁剪查看器中。因此,每一帧都需要表示视频中的某个时间帧。下面是我当前提取帧的ffmpeg代码: 感谢任何帮助

  • 我正在构建一个非常简单的KafkaStreams演示应用程序,以测试用例。 我无法升级我正在使用的Kafka broker(当前版本为0.10.0),并且有几条消息是由0.10.0之前的生产者编写的,因此我使用自定义时间戳提取器,我将其作为默认值添加到主类开头的配置中: 当从我的源主题消费时,这非常好。但是当使用聚合运算符时,我遇到了一个异常,因为当从内部聚合主题消费时,使用了的实现而不是自定义实

  • 我试图将日志文件索引到弹性搜索。所有日志条目都被索引到一个名为消息的字段中。@时间戳字段显示条目被索引的时间,而不是日志条目的时间戳。 我用grok processor创建了一个摄取管道来定义日志条目的模式。我尝试了几种模式,但都无法实现,尤其是因为我是新手。 日志样本 用grok吞食管道 我想要的就是能够从日志消息中提取时间戳,其他所有内容都可以被忽略、通配符或存储在一个变量中,比如消息。因此,