当前位置: 首页 > 知识库问答 >
问题:

从检查点恢复工作

郎弘壮
2023-03-14

我有一份flink的工作,它使用Kafka的数据,制作一些无状态平面图,并向Kafka生成数据,这是一份工作量非常小的工作。

例如,在作业需要从检查点还原之前,它通常会无问题地获取检查点,而它只是无法使用下面的堆栈跟踪还原状态。

状态非常小,我相信它只是Kafka偏移量,它至少运行了一次语义。

所有操作员都有。uid()集,我完全没有主意了。

这是尝试从检查点重新启动时的错误:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_fb0ea8b0a502b80e8c29508f37436fa7_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:322) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    ... 9 more
Caused by: java.io.EOFException: No more bytes left.
    at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:80) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.io.Input.readUtf8Chars_slow(Input.java:835) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.io.Input.readUtf8Chars(Input.java:828) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.io.Input.readString(Input.java:785) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:164) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:154) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:763) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:120) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:793) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:322) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    ... 9 more

任务管理器在正常操作下检查点时抛出此错误:

WARN  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Falling back to default Kryo serializer because Chill serializer couldn't be found.
java.lang.reflect.InvocationTargetException: null
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:444) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:467) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:258) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:64) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:76) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:89) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve type variable: A
    at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68) ~[dpa-runner-0.5.28-20210610.142951-35.jar:?]
    at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    ... 35 more

共有1个答案

潘俊
2023-03-14

IDE一定添加了这个****tty依赖项:

<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.45</version>
</dependency>

移除了它现在一切按预期运行

 类似资料:
  • 我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗

  • 目标:从Kinesis读取数据,并通过火花流将数据以拼花格式存储到S3 情况:应用程序最初运行良好,批量运行1小时,平均处理时间不到30分钟。出于某种原因,假设应用程序崩溃,我们尝试从检查点重新启动。现在,处理过程需要很长时间,不会向前推进。我们尝试以1分钟的分批间隔测试相同的东西,处理运行良好,分批完成需要1.2分钟。当我们从检查点恢复时,每批大约需要15分钟 注意:我们使用s3作为检查点,使用

  • 1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?

  • 我正在使用KCL(v2)将Kafka消费者转换为AWS动觉消费者。在Kafka中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的Kafka应用程序死机,它将使用重新启动时停止的偏移量。 然而,这在Kinesis中是不一样的。我可以设置,但唯一的参数是、或。如果我的Kinesis应用程序死机,它将不知道重新启动时从哪里恢复消费。 我的KCL消费者非常简单。方法如下所示: 而RecordProce

  • 问题内容: 在defer函数中,我想查看一次恢复调用是否会产生非nil值(不恢复) 可能吗? 问题答案: 那确切的事情是不可能的。您可能只想重新恐慌,就像在其他语言中重新引发异常一样。

  • 主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢