当前位置: 首页 > 面试题库 >

Flink键控流键为空

莫誉
2023-03-14
问题内容

我试图在Flink中的KeyedStream上执行映射操作:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())

JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield
”。然后,将流键入此字段。

MessageProcessorStateful是一个RichMapFunction,如下所示:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    }
}

该代码引发NullPointer异常:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
    at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)

尽管我已经验证了’keyfield’始终是有效的字符串,但KeyedStream之一的keyedState中的键似乎为null。根据Flink文档,休息似乎是正确的。知道发生了什么吗?


问题答案:

问题是您尝试访问方法中的键控状态open()

键控状态为每个键维护一个状态实例。在您的示例中,您正在使用MapState。因此MapState,每个密钥都有一个实例。访问状态时,您将始终获得与当前处理记录的键对应的状态实例。在一个MapFunction(如您的示例中)这将是传递给该map()方法的记录。

由于open()未与记录一起调用,因此当前键open()null,因此无法访问键控状态。



 类似资料:
  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)

  • 我有一个记录按顺序到达的流。我应用了一个map函数,然后在上面应用了keyBy函数。在每个具有相同键的记录流中,记录的顺序是否会保持? 在按顺序排列记录方面也存在类似的问题。但是我对这里给出的答案和下面从链接“https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html”中复制

  • 我需要用(userId)键控的缓慢变化的来丰富由(userId,startTripTimestamp)键控的快速变化的。 我使用带有数据流API的Flink1.8。我考虑两种方法: > 广播并通过用户ID和最新时间戳连接流。它是否等同于TableAPI中的DynamicTable?我可以看到这种解决方案的一些缺点:需要放入每个工作节点的RAM中,这会增加RAM的利用率,因为需要存储在每个工作节点的

  • 我有一份Flink的工作,我尝试在后端类型RockDB中使用键控流状态函数(MapState), MyRichMapFunction是一个有状态函数,它扩展了RichMapFunction,RichMapFunction有以下代码, 将来,我想重新缩放并行度(从2到4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后,我可以将相应的缓存键控数据获取到其相应的任务槽中。我试图探索这

  • 我正在尝试对Flink中的KeyedStream执行映射操作: JsonToObjectMapper运算符的输出是类MessageObject的POJO,它有一个字符串字段'keyfield'。然后在该字段上键入流。 代码抛出NullPointer异常: 似乎其中一个KeyedStream的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效字符串。根据Flink文

  • 这是关于连接键控流的一个非常基本的问题。 如果我有两个具有共享相同逻辑键的相关事件的流,并且这些流正在连接(使用该键进行逻辑连接),并且所有这些流都以并行方式运行 这是一个关于医院患者流的虚构示例——温度流和心跳流。我们希望使用和通过患者的id加入这两个流。 假设它以并行度=3运行,操作员任务A、B、C,并且它们都在不同的物理机器中运行。 Flink将保证患者“JohnDoe”的所有温度事件都将在