当前位置: 首页 > 知识库问答 >
问题:

闪烁键控流密钥为空

范浩荡
2023-03-14

我正在尝试对Flink中的KeyedStream执行映射操作:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())

JsonToObjectMapper运算符的输出是类MessageObject的POJO,它有一个字符串字段'keyfield'。然后在该字段上键入流。

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    }
}

代码抛出NullPointer异常:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
    at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)

似乎其中一个KeyedStream的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效字符串。根据Flink文档,Rest似乎是正确的。知道这是怎么回事吗?

共有1个答案

陶裕
2023-03-14

问题是您试图访问open()方法中的键控状态。

Keyed state为每个密钥维护一个状态实例。在您的示例中,您使用的是mapstate。因此每个键都有一个mapstate实例。在访问状态时,您将始终获得与当前处理的记录的键相对应的状态实例。在MapFunction(如您的示例)中,这将是传递给Map()方法的记录。

由于未使用记录调用open(),因此open()中的当前键为null并且无法访问键控状态。

 类似资料:
  • 我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。

  • 我的目标是有一个Flink流程序,保留最后的N个id,其中id是从事件中提取的。接收器是一个Cassandra存储区,因此可以随时获取ID列表。重要的是,卡桑德拉在每一次事件发生时都要立即得到最新消息。

  • 这是因为在开发模式下,为了通过 Webpack 实现热加载,CSS代码是打包在 JavaScript 代码中,并动态打到页面中去,从而元素重绘引起了闪烁。 不用担心,在生产模式下,CSS代码会单独打包至独立的文件并置于head标签内,不会出现页面闪烁的现象。

  • 问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke

  • 作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数

  • 我读取一个简单的JSON字符串作为输入,并基于两个字段和对流进行键控。但是KeyBy为的不同值生成相同的键控流,但为和的特定组合生成相同的键控流。 输入: 这是我的Flink代码的核心逻辑: