在Flink中,我有一个键控流,我正在对它应用一个进程函数。
myDataStream
.keyBy(new MyKeySelector())
.process(new FooBarProcessFunction())
我的键选择器看起来像这样...
public class MyKeySelector implements KeySelector<FooBar, FooKey>
public FooKey getKey (FooBar value) {
return new FooKey (value);
}
FooBarProcessFunction看起来像这样...
public class FooBarProcessFunction extends ProcessFunction<FooBar, TransformedFooBar> {
public void processElement(FooBar newFooBar, Context ctx, Collector<FooBar> out) {
//do something with newFooBar
// *****but I also want to know the Key (FooKey) here*****
}
}
在FooBarProcessFunction中,我希望获得由MyKeySelector的getKey方法创建的键。那可行吗?
目前,我正在使用一种变通方法,其中我基本上在processElement函数中重新创建键。但如果我能避免这样做,那就太理想了。
如果您打开键控流窗口并应用ProcessWindowFunction
,似乎可以获得键。
Apache flink文档中有一些这样的例子。请注意,ProcessWindowFunction
效率不高,应与ReduceFunction
、AggregateFunction
或FoldFunction
组合使用。
希望这有帮助!
为了从进程函数访问密钥,您应该使用keyedprocessfunction
你的例子变成:
public class FooBarProcessFunction extends KeyedProcessFunction<FooKey, FooBar, TransformedFooBar> {
public void processElement(FooBar newFooBar, Context ctx, Collector<FooBar> out) {
//do something with newFooBar
// *****but I also want to know the Key (FooKey) here*****
ctx.getCurrentKey
}
}
我想做的是对每个窗口批处理的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我已经在DataStream API中看到了该方法的应用,但我不理解它是如何工作的。在Flink API中,它说它的用法与Scala中的用法相同: 有人能解释一下apply方法是做什么的或者是如何使用的吗?Scala中的示例会更好。apply方法是否符合我的要求?
我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?
问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke
我对流中的事件进行了键控,我希望通过键来累积,直到超时(例如,5分钟),然后处理累积到该点的事件(忽略该键之后的所有内容,但首先是第一件事)。 我是一个新的Flink,但从概念上来说,我认为我需要一些类似下面代码的东西。 如何在Flink中完成键控窗口超时?
我想要一个可以跟踪我所有应用程序使用时间的应用程序。如何在颤振中实现这一点?要使用哪些软件包?
我需要用(userId)键控的缓慢变化的来丰富由(userId,startTripTimestamp)键控的快速变化的。 我使用带有数据流API的Flink1.8。我考虑两种方法: > 广播并通过用户ID和最新时间戳连接流。它是否等同于TableAPI中的DynamicTable?我可以看到这种解决方案的一些缺点:需要放入每个工作节点的RAM中,这会增加RAM的利用率,因为需要存储在每个工作节点的