当前位置: 首页 > 知识库问答 >
问题:

Flink state后端在失败后无法恢复taskmanager

孟永望
2023-03-14

我是flink的新手,我正在实现一个模式识别模块(不使用CEP实现模式匹配),该模块将从EventHub主题读取json流,并在模式匹配的情况下推送到另一个EventHub主题。我的模块功能如下

>

  • 从Eventhub主题接收JSON有效负载

    我正在使用RichSourceFunction,它将从API读取模式并发送到广播流

    我正在使用Flink BroadcastProcessFunction根据广播状态中可用的模式列表处理数据,并且我没有在我的程序中使用键控流或任何状态,因为没有状态的范围,我只需要检查某些值是否存在于JSON中。

    我读到flink在内部维护状态,即使没有明确实现状态

    下面是检查点设置,我平均每小时收到约100万个有效负载,有时还会更多

     env.enableCheckpointing(interval);
     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
     StateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath, true);
     env.setStateBackend(stateBackend);
     env.getCheckpointConfig().setCheckpointInterval(12000);
     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
     env.getCheckpointConfig().setCheckpointTimeout(120000);
     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
     env.getCheckpointConfig().enableExternalizedCheckpoints(org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    但当任务管理器失败时,它正试图从StateBend恢复状态。我使用RocksDB作为我的statebacked,但它失败了,出现以下错误。我使用的是Flink 1.10.0版本和Java 1.8

    05:39:14.260 [Source: Custom Source -> Flat Map (5/12)] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(5/12) from alternative (1/1), will retry while more alternatives are available.
    org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
    Caused by: java.io.IOException: Stream Closed
            at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
            at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
            at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
            at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
            at org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap(LinkedOptionalMapSerializer.java:86) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readDefaultKryoSerializerClasses(KryoSerializerSnapshotData.java:208) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:72) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:77) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            ... 15 more
    05:39:14.261 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (5/12) 0b418e2ffcd028a58f39029d3f8be08e.
    05:39:14.261 [Source: Custom Source -> Flat Map (3/12)] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(3/12) from alternative (1/1), will retry while more alternatives are available.
    org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
    Caused by: java.io.IOException: Stream Closed
            at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
            at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
            at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
            at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
            at org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap(LinkedOptionalMapSerializer.java:86) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readDefaultKryoSerializerClasses(KryoSerializerSnapshotData.java:208) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:72) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:77) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            ... 15 more
    05:39:14.262 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (3/12) a973d1d62f5086d1126d83d81278cc0a.
    05:39:14.283 [Source: Custom Source -> Flat Map (1/12)] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(1/12) from alternative (1/1), will retry while more alternatives are available.
    org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
    Caused by: java.io.IOException: Stream Closed
            at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
            at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
            at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
            at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:165) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
            ... 15 more
    05:39:14.283 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (1/12) c1a83f3812be2a4099737d6eee5b41d0.
    05:39:14.441 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: Cassandra Sink (1/4) caadf9ad0d011d308659cf47a3b74cc4.
    05:40:36.616 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: f5741b19f3f1281ae65d67994dba045b, jobId: a0d922bbf1c20ed9417415827c32e1a3).
    05:40:36.617 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: 5a92c83b6a105b726105cb0432980be6, jobId: a0d922bbf1c20ed9417415827c32e1a3).
    05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: dd952690f30c88860b451b1ce4e2fc6d, jobId: a0d922bbf1c20ed9417415827c32e1a3).
    05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService cisco - Remove job a0d922bbf1c20ed9417415827c32e1a3 from job leader monitoring.
    05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Close JobManager connection for job a0d922bbf1c20ed9417415827c32e1a3.
    05:40:36.621 [flink-akka.actor.default-dispatcher-110] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Close JobManager connection for job a0d922bbf1c20ed9417415827c32e1a3.
    05:40:36.621 [flink-akka.actor.default-dispatcher-110] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService cisco - Cannot reconnect to job a0d922bbf1c20ed9417415827c32e1a3 because it is not registered.
    

    如果我做错了什么,请帮助我解决问题,如果需要任何信息,请让我知道。

    下面是BroadcastProcess函数和Cassandra接收器的代码,我正在使用它来保持传入信号的状态,以便进行审计

    ================================Source Function To Read Patterns from API Call=================================================
    public class PatternSource extends RichSourceFunction<Map<String, Map<String, Pattern>>> {
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Map<String, Map<String, Pattern>>> ctx) throws Exception {
            String patternUrl =
                    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            Map<String, Map<String, Pattern>> patterns = getPatternData(patternUrl);
            ctx.collect(patterns);
            while (isRunning) {
                Thread.sleep(10000);
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
        
    =================================================================================================================================
    
    
    ====================================================BroadcastProcessFunction Class================================================
    
    public static final MapStateDescriptor<String, Map<String, String>> patternPatternDescriptor = new MapStateDescriptor("PatternPatternDescriptor", 
    BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo(String.class, Pattern.class));
    
    public class PatternDetection extends BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>> {
    
        public void processElement(Tuple2<String, InputSignal> InputSignal, BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>>.ReadOnlyContext ctx, Collector<Tuple2<String, InputSignal>> out) throws Exception {
                InputSignal signal = (InputSignal)InputSignal.f1;
                JSONObject InputSignalPayLoad = new JSONObject(signal.getSignalPayload());
                HashMap<String, InputSignal> finalOutput = new HashMap();
                String sourceName = ((InputSignal)InputSignal.f1).getSignalHeader().getSignalSource().toUpperCase();
                Map<String, Pattern> patternList = ctx.getBroadcastState(patternPatternDescriptor).get(sourceName);
        
                String patternName = Pattern.getPatternName();
                String patternDefinition = Pattern.getPatternDefinition();
    
                /*Implemented my custom JSON data matched*/
                Matcher<?> jsonMatcher = this.buildMatcher(patternDefinition);
                if (jsonMatcher != null && jsonMatcher.matches(Arrays.asList(InputSignalPayLoad))) {
                    ctx.output(validSignalOutput, InputSignalPayLoad);
                }
            } 
        }
        
        public void processBroadcastElement(Tuple2<String, Map<String, Pattern>> patternCondition, BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>>.Context ctx, Collector<Tuple2<String, InputSignal>> out) throws Exception {
                String signalSource = ((String)patternCondition.f0).toUpperCase();
                BroadcastState<String, Map<String, Pattern>> state = ctx.getBroadcastState(patternPatternDescriptor);
                Map<String, Pattern> patterns = ctx.getBroadcastState(patternPatternDescriptor).get(signalSource);
        }
    }
    
    ======================================================================================================================================
    
    
    
    ====================================================Cassandra Sink====================================================================
    
        public static void createInputSignalSink(DataStream<InputSignalSignalHistory> dataStream, Properties properties, int parallelism) {
            try {
                log.info(LogMessageBuilder.buildLogMessage("Inserting InputSignal signal history to cassandra database"));
                CassandraSink.addSink(dataStream).setClusterBuilder(buildClusterBuilder(properties)).setMapperOptions(() -> {
                    return new Option[]{Option.saveNullFields(true)};
                }).build().setParallelism(parallelism);
                log.info(LogMessageBuilder.buildLogMessage("Cassandra sink cluster builder is ready"));
            } catch (Exception exp) {
                (exp.printstacktrace());
            }
        }
    =====================================================================================================================================
    
  • 共有1个答案

    虞滨海
    2023-03-14

    我读到flink在内部维护状态,即使没有明确实现状态

    一些内置操作符确实隐式维护状态,例如

    • 一些源和汇
    • windows
    • CEP
    • 减少、总和、最大值、最小值

    从您共享的堆栈跟踪来看,检查点似乎包含无法还原的源操作符的状态;大概是您的自定义源。

    如果您想共享自定义源代码,那么诊断问题就更容易了,但我首先要看看您是否正确实现了CheckpointedFunction接口,尤其是initializeState(FunctionInitializationContext上下文)方法。

     类似资料:
    • 问题内容: 以下代码尝试使用Spring + Hibernate 将对象插入数据库。该项目具有一个整数ID字段作为主键,以及一列,该列受到唯一约束(简化示例)。 我知道该项目的ID为null(该项目是临时的),但是由于对名称字段的唯一约束,插入操作仍然可能失败。 我需要此代码在一个事务中针对许多项目循环运行,这就是为什么我尝试在插入失败的情况下发出更新的原因。 但是,在插入失败之后,hiberna

    • 我刚刚继承了一个由7台服务器组成并通过Ambari管理的hadoop集群(以前从未使用过hadoop)。 今天Ambari对server3上的所有服务以及ZooKeeper服务(托管在服务器1、2和3上)、ZKFailover(托管在服务器1和2上)和ZooKeeper客户端(托管在4,5,6,7上)停止并全部拒绝启动。这也导致Solr服务停止工作。 经过一些调查,我发现server3上的ZooK

    • 我在android上使用libgdx时遇到字体问题。当我第一次打开应用程序时,它们工作得很好。但是,当我暂停应用程序然后继续时,字体呈现不正确。 下面是我如何创建字体的。 我的暂停/恢复方法中没有任何内容,不确定是否应该有什么内容。 这是它之前/之后的样子。

    • 问题内容: 我是Ubuntu(Linux tbh)的新手。我在启动MongoDB服务时遇到问题。我想使用Ruby on Rails和MongoDB创建Web应用程序,但是mongo似乎无法启动。 我在Ubuntu上进行了此MongoDB安装,直到安装完成,所有安装均顺利进行 mongod.log 那么我在做错什么还是其他问题呢? 问题答案: kometen的解决方案 文件/tmp/mongodb-

    • 问题内容: 我已经困扰了几天以了解这个问题。基本上我有一个加载网站的网站,第一页是登录名,登录后显示内容。 每个用户验证都是通过其自身的网站进行的。因此,例如,没有要保存的内容。我只在这种情况下使用网址。 所以在杀死我的应用程序后,onCreate,webview不会恢复被杀死之前的状态。 我想这是因为saveinstancestate变为null,并且在应用被终止后再次加载了url。 我有会话c

    • 我制作了一个recyclerview网格,一旦用户按下按钮,它就会被api中的数据填满。旋转屏幕后,滚动位置会短暂恢复,然后再向上滚动到网格顶部。我使用本教程来保留我的滚动位置: 我还在本教程中尝试保存布局状态。 我的舱单条目是: