当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:ProcessWindowFunction不适用

封弘伟
2023-03-14

我想在Apache Flink项目中使用ProcessWindowFunction。但我在使用process函数时遇到了一些错误,请参见下面的代码片段

错误是:

方法进程(ProcessWindowFunction, R, Tuple, TimeWindow

我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

我的处理窗口功能:

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}

共有2个答案

方坚壁
2023-03-14

Fabian说:)使用Tuple应该可以工作,但确实会在您的ProcessWindowFunction中涉及一些丑陋的类型转换。使用KeySelector很容易,并且会产生更简洁的代码。例如。

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

然后,通过上面的操作,您可以定义一个ProcessWindowFunction,如:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {
路雅懿
2023-03-14

问题可能是ProcessWindowFunction的泛型类型。

您正在按位置引用按键(keyBy(0))。因此,编译器无法推断其类型(字符串),您需要将ProcessWindowFunction更改为:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

通过将String替换为Tuple,您现在有了一个可以强制转换为Tuple1的键的通用占位符

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

如果您定义了KeySelector,您可以避免强制转换并使用正确的类型

 类似资料:
  • 问题内容: 谁能解决我遇到的图形问题。这段代码根本不应用setPolyToPoly ..它会进行Camera旋转,但不会进行polyToPoly转换..我不明白为什么。 问题答案: 该样本并不完全适合您的问题,但可能会为您指明正确的方向。在此示例中,将矩阵应用于放置在透视图中的位图。如果将我与您的代码段进行比较,则可以设置多边形,但它不会应用于相机。

  • 问题内容: 我正在尝试学习JavaFX,并且已经编写了下面显示的代码,但是我似乎在使用以下代码行时遇到麻烦: 在强调setOnAction的位置并显示此错误: 我究竟做错了什么? 问题答案: 您已经导入了awt事件监听器,只需更改此行代码 有了这个 你也可以像这样使用lambda表达式

  • 我正在尝试学习JavaFX,我已经编写了下面所示的代码,但这行代码似乎有问题: 其中它在setOnAction下划线,并打印此错误: 我做错了什么?

  • 问题内容: 嗨,我只是简单地尝试在www.example.com上获取h1标签,该标签显示为“ Example Domain”。该代码适用于http://www.example.com,但不适用于https://www.exmaple.com。我该如何解决这个问题?谢谢 问题答案: PhantomJSDriver不支持(所有)DesiredCapabilities。 你会需要: 记录在这里:htt

  • 所以我使用这种方法写入文件,它在windows上运行完全正常,但在mac上运行时,它会创建文件,但它们是空的。 我知道数据是正确的,因为它打印正确。感谢您的任何帮助,这真的让我绊倒了。

  • 问题内容: 我的FXML中有一个DatePicker,我需要日期才能将其插入到SQL数据库中。我想格式化日期,但是不起作用。 这是我得到的错误。 我还是个初学者。我在过去的3到4个月中都拥有Java。我正在尽力改善。 问题答案: 我必须为Datepicker使用String转换器。 它工作得很好。我必须使用参数,因为我当前正在使用5个Datepickers。