当前位置: 首页 > 知识库问答 >
问题:

在Flink中,当对键控流应用进程函数时,我如何访问键?

岑毅庵
2023-03-14

在Flink中,我有一个键控流,我正在对它应用一个进程函数。

myDataStream
  .keyBy(new MyKeySelector())
  .process(new FooBarProcessFunction())

我的键选择器看起来像这样...

public class MyKeySelector implements KeySelector<FooBar, FooKey>

public FooKey getKey (FooBar value) {
   return new FooKey (value);
}

FooBarProcessFunction看起来像这样...

public class FooBarProcessFunction extends ProcessFunction<FooBar, TransformedFooBar> {

    public void processElement(FooBar newFooBar, Context ctx, Collector<FooBar> out) {
        //do something with newFooBar
        // *****but I also want to know the Key (FooKey) here***** 
    }
}

在FooBarProcessFunction中,我希望获得由MyKeySelector的getKey方法创建的键。那可行吗?

目前,我正在使用一种变通方法,其中我基本上在processElement函数中重新创建键。但如果我能避免这样做,那就太理想了。

共有2个答案

令狐烨烨
2023-03-14

如果您打开键控流窗口并应用ProcessWindowFunction ,似乎可以获得键。

Apache flink文档中有一些这样的例子。请注意,ProcessWindowFunction效率不高,应与ReduceFunctionAggregateFunctionFoldFunction组合使用。

希望这有帮助!

胥和悌
2023-03-14

为了从进程函数访问密钥,您应该使用keyedprocessfunction

你的例子变成:

public class FooBarProcessFunction extends KeyedProcessFunction<FooKey, FooBar, TransformedFooBar> {

public void processElement(FooBar newFooBar, Context ctx, Collector<FooBar> out) {
    //do something with newFooBar
    // *****but I also want to know the Key (FooKey) here***** 
   ctx.getCurrentKey
}

}

 类似资料:
  • 我想做的是对每个窗口批处理的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我已经在DataStream API中看到了该方法的应用,但我不理解它是如何工作的。在Flink API中,它说它的用法与Scala中的用法相同: 有人能解释一下apply方法是做什么的或者是如何使用的吗?Scala中的示例会更好。apply方法是否符合我的要求?

  • 我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?

  • 问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke

  • 我对流中的事件进行了键控,我希望通过键来累积,直到超时(例如,5分钟),然后处理累积到该点的事件(忽略该键之后的所有内容,但首先是第一件事)。 我是一个新的Flink,但从概念上来说,我认为我需要一些类似下面代码的东西。 如何在Flink中完成键控窗口超时?

  • 我想要一个可以跟踪我所有应用程序使用时间的应用程序。如何在颤振中实现这一点?要使用哪些软件包?

  • 我需要用(userId)键控的缓慢变化的来丰富由(userId,startTripTimestamp)键控的快速变化的。 我使用带有数据流API的Flink1.8。我考虑两种方法: > 广播并通过用户ID和最新时间戳连接流。它是否等同于TableAPI中的DynamicTable?我可以看到这种解决方案的一些缺点:需要放入每个工作节点的RAM中,这会增加RAM的利用率,因为需要存储在每个工作节点的