当前位置: 首页 > 知识库问答 >
问题:

Flink可扩展键控流有状态功能

乔凯康
2023-03-14

我有一份Flink的工作,我尝试在后端类型RockDB中使用键控流状态函数(MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")

MyRichMapFunction是一个有状态函数,它扩展了RichMapFunction,RichMapFunction有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}

将来,我想重新缩放并行度(从2到4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后,我可以将相应的缓存键控数据获取到其相应的任务槽中。我试图探索这一点,在这里我找到了一份文档。据此,可以通过使用ListCheckPointed接口来实现可重新伸缩的操作员状态,该接口提供了快照状态/恢复状态方法。但不确定如何实现可重新伸缩的键控状态(MyRichMapFunction)?我是否需要为MyRichMapFunction类实现ListCheckPointed接口?如果是,我如何根据restoreState方法上的新并行密钥哈希重新分配缓存(我的MapState将在启用TTL的情况下保存大量密钥,假设在任何时间点最多可以保存10亿个密钥)?有人能帮我一下吗?或者你能给我举一个很好的例子吗。

共有1个答案

姬凡
2023-03-14

您编写的代码已经可以重新缩放;Flink的托管键控状态可通过设计进行重新缩放。通过重新平衡对实例的密钥分配,可以重新调整已设置关键帧的状态。(您可以将键控状态视为分片键/值存储。从技术上讲,所发生的事情是使用一致的哈希将键映射到键组,每个并行实例负责一些键组。重缩放仅涉及在实例之间重新分配键组。)

ListCheckpointed接口用于非键控上下文中使用的状态,因此它不适合您所做的操作。还要注意的是,Flink 1.11中会弃用ListCheckpointed,而使用更通用的CheckpointedFunction。

还有一件事:如果MyKeyExtractor是按值键入的。getEventId(),则可以使用ValueState

其中大部分内容都在Flink文档的“动手训练”下进行了讨论,其中包括一个与您所做的非常接近的示例。

 类似资料:
  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)

  • 我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?

  • 问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke

  • 我使用键控进程函数来使用RocksDB状态后端。我想为同一把钥匙持有两种不同的状态; 状态1类型:ValueState[字符串] 状态2类型:MapState[String, Long] 在这种情况下,我必须在同一个键控进程函数中创建两个状态描述符。这在flink中可能吗?

  • 扩展说明 检查服务依赖各种资源的状态,此状态检查可同时用于 telnet 的 status 命令和 hosting 的 status 页面。 扩展接口 org.apache.dubbo.common.status.StatusChecker 扩展配置 <dubbo:protocol status="xxx,yyy" /> <!-- 缺省值设置,当<dubbo:protocol>没有配置status

  • 我正在编写一个Flink应用程序,它使用kafka主题中的时间序列数据。时间序列数据包含度量名称、标记键值对、时间戳和值等组件。我已经创建了一个滚动窗口来根据度量键(度量名称、键值对和时间戳的组合)聚合数据。这里是主流看起来像 我还想检查是否有任何指标在上面的窗口外迟到。我想检查有多少指标延迟到达,并计算延迟指标与原始指标相比的百分比。我正在考虑使用flink的“允许延迟”功能将延迟指标发送到不同