当前位置: 首页 > 知识库问答 >
问题:

Flink:我如何使用键控状态?

钱卓君
2023-03-14

我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示:

inputStream
    .keyBy(new MyKeySelector())
    .process(new MyKeyedProcessFunction());

在KeyedProcessFunction中,我有一个状态变量:

public class MyKeyedProcessFunction extends KeyedProcessFunction<...> {

    private MapState<String, ...> state;

    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, ...> descriptor = new MapStateDescriptor<>(
            "keyed",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<...>() {
            }));

        state = getRuntimeContext().getMapState(descriptor);
    }

    @Override
    public void processElement(... event, Context context, Collector<...> out) throws Exception {

        String key = context.getCurrentKey();

        ... keyedState;
        if (state.contains(key)) {
            keyedState = state.get(key);
        } else {
            keyedState = new ...();
        }
        ...
    }

...
}

我对此还这么陌生,我做错了什么?

共有1个答案

郜联
2023-03-14

这在作为Flink文档一部分的教程中有很好的介绍,在Ververica共享的免费Flink培训材料中也有更深入的介绍。这两个课程都利用了https://github.com/apache/flink-training中的练习,这将使所有这些更加清晰。

如果您的目标是每个键存储一个对象,那么您所需要的就是valueState ,它将创建一个分布在集群中的分片散列映射,为每个不同的键存储一个t类型的对象。mapstate用于需要为每个键存储一个hashmap时--例如,如果您希望为每个用户都有一个开放式属性哈希,给定一个由userid键控的流。

KeyedProcessFunction的实例在属于分配给运算符实例的密钥组的所有密钥上复用。在KeyedProcessFunctionOpen方法中,上下文中没有键;当您在那里实例化valueState对象时,您将取回内存中或本地磁盘上的hashmap句柄,这取决于您使用的后端状态。然后,当您在processElement()方法中调用state.value()state.update(...)时,上下文中有一个特定事件,当前事件的键被隐式地用于读取或写入状态HashMap中的适当条目。

此接口的设计目的是防止您试图操作无法在本地实例中访问的键的状态(由于该状态是在集群中分片的,因此不能保证该实例中除了当前事件的键之外的任何其他键的状态可用)。但不可否认的是,关键字在API中却不可见,却发挥着如此重要的作用,这是令人困惑的。

 类似资料:
  • 我正在使用在Flink中执行流计算。我为我的作业定义了一个扩展的类。假设我有一个通过键控的流a,和一个流B,它被广播给所有执行程序,以使用我定义的类处理a中的元素。我知道我可以在这个类的或中注册一个计时器,这样当它超时时,我可以通过调用来删除特定密钥组的关联状态。之后我在想,这个重点群体还存在吗? 例如,在流A中,一个新消息带有,我们生成了这样的密钥组及其关联状态。之后,如果出现另一个带有的消息,

  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)

  • 我有一份Flink的工作,我尝试在后端类型RockDB中使用键控流状态函数(MapState), MyRichMapFunction是一个有状态函数,它扩展了RichMapFunction,RichMapFunction有以下代码, 将来,我想重新缩放并行度(从2到4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后,我可以将相应的缓存键控数据获取到其相应的任务槽中。我试图探索这

  • 问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke

  • 我正在编写一个Flink应用程序,它使用kafka主题中的时间序列数据。时间序列数据包含度量名称、标记键值对、时间戳和值等组件。我已经创建了一个滚动窗口来根据度量键(度量名称、键值对和时间戳的组合)聚合数据。这里是主流看起来像 我还想检查是否有任何指标在上面的窗口外迟到。我想检查有多少指标延迟到达,并计算延迟指标与原始指标相比的百分比。我正在考虑使用flink的“允许延迟”功能将延迟指标发送到不同

  • 在Flink中,我有一个键控流,我正在对它应用一个进程函数。 我的键选择器看起来像这样... FooBarProcessFunction看起来像这样... 在FooBarProcessFunction中,我希望获得由MyKeySelector的getKey方法创建的键。那可行吗? 目前,我正在使用一种变通方法,其中我基本上在processElement函数中重新创建键。但如果我能避免这样做,那就太