当前位置: 首页 > 知识库问答 >
问题:

时间窗口上的Flink应用函数

章飞虎
2023-03-14
DataStream<Tuple2<String,JSONObject>> MetaAlert = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new generateMetaAlert());




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {

        @Override
        public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
                Collector<Tuple2<String, JSONObject>> arg3) throws Exception {


        }

共有1个答案

柯波娃
2023-03-14

应用keyby函数时(不使用匿名类),自定义windowfunction(第三个字段)中的键类型应为tuple,因为编译器无法确定键的类型。这段代码编译时没有错误(考虑到我试图用伪代码填充空白处):

public class Test {

    public Test() {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<String> events = env.readTextFile("datastream.log");

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());

    }

    public class JSONObject {
    }

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

但最直接的方法是使用匿名类,这样您就可以保留string类型:

DataStream<Tuple2<String, JSONObject>> MetaAlert
        = events
        .flatMap(new JSONParser())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
                // Your code here
            }
        });

最后,如果希望保留类,但又希望保持you key的类型不变,则可以实现KeySelector:

public class Test {

    public Test() {

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
                    @Override
                    public String getKey(Tuple2<String, JSONObject> json) throws Exception {
                        return json.f0;
                    }
                })
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
        @Override
        public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}
 类似资料:
  • 我正在尝试使用主题列表中的单个kafka使用者组合两个kafka主题,进一步将流中的json字符串转换为POJO。然后,通过keyBy(On事件时间字段)将它们加入,并将它们合并为单个胖json,我计划使用窗口流并在窗口流上应用窗口函数。假设主题A 我有几个问题。 这种方法适合合并主题并创建单个JSON吗 所有窗口流上的窗口函数似乎工作不正常;任何指点都将不胜感激 代码片段: 我得到了- AllW

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 我有一个数据流是键控的,需要计算不同时间段(1分钟,5分钟,1天,1周)的翻滚计数。 有可能在一个应用程序中计算所有四个窗口计数吗?

  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢

  • 我正在尝试flink的一些网络监控工作。我的目标是计算每个的不同。 我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。 例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗

  • 的结果是一个元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。 然后我通过一个常量(-因为这是一个全局操作)进行键控,并使用-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。 我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。 目前在Flink有可能吗?