我正在尝试对Flink中的KeyedStream执行映射操作:
stream.map(new JsonToMessageObjectMapper())
.keyBy("keyfield")
.map(new MessageProcessorStateful())
JsonToObjectMapper运算符的输出是类MessageObject的POJO,它有一个字符串字段'keyfield'。然后在该字段上键入流。
public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {
private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
...
@Override
public void open(Configuration config) throws Exception {
MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
new MapStateDescriptor<>(
"state", // the state name
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
state = getRuntimeContext().getMapState(descriptor);
state.put(...); // Insert a key, value here. Exception here!
}
}
代码抛出NullPointer异常:
Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
似乎其中一个KeyedStream的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效字符串。根据Flink文档,Rest似乎是正确的。知道这是怎么回事吗?
问题是您试图访问open()
方法中的键控状态。
Keyed state为每个密钥维护一个状态实例。在您的示例中,您使用的是mapstate
。因此每个键都有一个mapstate
实例。在访问状态时,您将始终获得与当前处理的记录的键相对应的状态实例。在MapFunction
(如您的示例)中,这将是传递给Map()
方法的记录。
由于未使用记录调用open()
,因此open()
中的当前键为null
并且无法访问键控状态。
我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。
我的目标是有一个Flink流程序,保留最后的N个id,其中id是从事件中提取的。接收器是一个Cassandra存储区,因此可以随时获取ID列表。重要的是,卡桑德拉在每一次事件发生时都要立即得到最新消息。
这是因为在开发模式下,为了通过 Webpack 实现热加载,CSS代码是打包在 JavaScript 代码中,并动态打到页面中去,从而元素重绘引起了闪烁。 不用担心,在生产模式下,CSS代码会单独打包至独立的文件并置于head标签内,不会出现页面闪烁的现象。
问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke
作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数
我读取一个简单的JSON字符串作为输入,并基于两个字段和对流进行键控。但是KeyBy为的不同值生成相同的键控流,但为和的特定组合生成相同的键控流。 输入: 这是我的Flink代码的核心逻辑: