当前位置: 首页 > 知识库问答 >
问题:

如何理解Apache Flink中的窗口机制

彭礼骞
2023-03-14

我正在学习如何使用Flink处理流数据。

根据我的理解,我可以多次使用函数map进行各种转换。

表示数据源不断向Flink发送字符串。所有字符串都是JSON格式的数据,如下所示:

{"name":"titi","age":18}
{"name":"toto","age":20}
...

下面是我的代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>()))
    .build();

// Convert String to Json Object
// MyJson is a POJO class, defined by me
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
    .map(new MapFunction<String, MyJson>() {
    @Override
    public MyJson map(String s) throws Exception {
        MyJson myJson = JSON.parseObject(s, MyJson.class);
        return myJson;
        }
    });
// Convert MyJson Object to String and extract what I need
DataStream<String> valueInJson = jsonStream
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            return myJson.getName().toString();
        }
    });
valueInJson.print();
env.execute("StreamingJob");

正如您所看到的,我的示例非常简单:获取并反序列化数据-->将string转换为Json对象-->将Json对象转换为string并获取所需内容(这里只需要name)。

就目前而言,似乎一切都很好。我确实从日志文件中获得了预期的输出。

不过,我知道Flink为我们提供了一个强大的功能:窗口。

我想知道如何在我的例子中使用这个机制。

例如,如果我想用一些2秒的窗口拆分数据流,如何编码?

DataStream<String> valueInJson = jsonStream
    .timeWindow(Time.seconds(2))
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            return myJson.toString();
        }
    });
valueInJson.print();

但是,我输入了:

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;

为什么我会得到这个错误?我用错窗户了吗?我是不是错过了关于Flink的理解?

共有1个答案

巢皓君
2023-03-14

出现错误是因为timewindow()函数是在keyedstream中定义的,而不是在datastream中,因为它是基于键的操作。在您的情况下,将timewindow()更改为timeWindowall()就足够了。

 类似资料:
  • 如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 2.4 理解TTY窗口 用户可以通过TTY窗口将程序的输入传给被调试的程序。这个窗口与GDB窗口类似,除了在TTY窗口中输入的数据将被直接传给被调试的程序。参见第七章。 您会看到,在TTY窗口和被调试的程序窗口之间有一个tty设备。因此如果被调试的程序使用readline等行缓冲输入,则命令行输入是可以被编辑的。这个tty设备也会被通过TTY窗口当作程序的终端输出。您可以在TTY窗口和GDB窗口之

  • 2.2 理解GDB窗口 CGDB通过GDB窗口让用户直接操作gdb。如果您想要将一个命令发送给gdb,只要将命令输入进GDB窗口即可。使用CGDB中的GDB窗口和在命令行下使用gdb是完全一样的。 在这个窗口中输入的一部分快捷键会被CGDB截获并处理,而非直接发送给gdb。这些快捷键的列表在3.2节 类似于终端操作,CGDB将会将您连续输入的多个命令缓存起来。因此当您在一个命令完成之前又输入了多个

  • 2.1 理解代码窗口 您可以通过代码窗口查看当前被调试程序的源代码。CGDB只能同时显示一个源文件。当用户调试程序的时候,通过 next 和 step 命令,CGDB将会更新源代码以及行号,以此提醒您调试进行到了何处。 CGDB有几个新特性能让调试比使用旧的GDB更方便。其中,当您在调试C,C++或ADA程序的时候,源代码是高亮的。这个特性可以让您更加快速的找到源文件中的特定代码。如果您需要让CG