主程序正在消费kafka事件,然后过滤-
object StateReader extends App {
val path = "file://...."
val env = ExecutionEnvironment.getExecutionEnvironment
val chk = Savepoint.load(env.getJavaEnv, path, new FsStateBackend(path))
val ds = chk.readKeyedState("cep", new CepOperatorReadFunction, TypeInformation.of(classOf[KEY]), TypeInformation.of(classOf[VALUE]))
println(ds.count())
}
class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY, VALUE] {
override def open(parameters: Configuration): Unit = {
}
override def readKey(k: KEY, context: KeyedStateReaderFunction.Context, collector: Collector[VALUE]): Unit = {
}//end readKey
}//end class CepOperatorReadFunction
但是我得到了以下例外:
Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more
以下是flink-conf.yaml中的一些配置
state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
任何想法为什么会发生异常以及如何解决问题?
谢谢
没有开箱即用的支持可以轻松读取CEP运算符的状态。因此,要实现您的KeyedStateReaderFunction
,您必须深入研究CEP实现,找到使用的ValueState
s和MapState
s,并实现使用这些相同状态描述符的阅读器。
我可以在文档中看到: Flink目前只为没有迭代的作业提供处理保证。对迭代作业启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enablecheckpointing(interval,force=true)。 如果是一个而不是一个(这意味着它也可以保存状态),会有什么变化吗?
一、状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State: 2.1 算子状态 算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方
我们有一个具有 20 个独立管道的流式处理作业,每个管道具有一个或多个 Kafka 主题源。 当我们使用一个新的jar(我又添加了一个管道)重新启动作业,并且AllowNonRestoredState=true时,我们注意到从检查点恢复Operatorstate的奇怪行为。 我们当前用于添加管道的配置是静态的,我们基本上正在更改代码以添加任何新管道。 我们没有为任何运算符设置任何UID。 当我们从
1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?
我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗
我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)