当前位置: 首页 > 知识库问答 >
问题:

使用Flink处理器API恢复检查点状态失败

危寒
2023-03-14

主程序正在消费kafka事件,然后过滤-

object StateReader extends App {

  val path = "file://...."

  val env = ExecutionEnvironment.getExecutionEnvironment

  val chk = Savepoint.load(env.getJavaEnv, path, new FsStateBackend(path))

  val ds = chk.readKeyedState("cep", new CepOperatorReadFunction, TypeInformation.of(classOf[KEY]), TypeInformation.of(classOf[VALUE]))
  println(ds.count())

}

class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY, VALUE] {
  override def open(parameters: Configuration): Unit = {

  }
  override def readKey(k: KEY, context: KeyedStateReaderFunction.Context, collector: Collector[VALUE]): Unit = {

  }//end readKey
}//end class CepOperatorReadFunction

但是我得到了以下例外:

Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
    ... 13 more

以下是flink-conf.yaml中的一些配置

state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0

任何想法为什么会发生异常以及如何解决问题?

谢谢

共有1个答案

白嘉石
2023-03-14

没有开箱即用的支持可以轻松读取CEP运算符的状态。因此,要实现您的KeyedStateReaderFunction,您必须深入研究CEP实现,找到使用的ValueStates和MapStates,并实现使用这些相同状态描述符的阅读器。

 类似资料:
  • 我可以在文档中看到: Flink目前只为没有迭代的作业提供处理保证。对迭代作业启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enablecheckpointing(interval,force=true)。 如果是一个而不是一个(这意味着它也可以保存状态),会有什么变化吗?

  • 一、状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State: 2.1 算子状态 算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方

  • 我们有一个具有 20 个独立管道的流式处理作业,每个管道具有一个或多个 Kafka 主题源。 当我们使用一个新的jar(我又添加了一个管道)重新启动作业,并且AllowNonRestoredState=true时,我们注意到从检查点恢复Operatorstate的奇怪行为。 我们当前用于添加管道的配置是静态的,我们基本上正在更改代码以添加任何新管道。 我们没有为任何运算符设置任何UID。 当我们从

  • 1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?

  • 我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗

  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)