当前位置: 首页 > 知识库问答 >
问题:

滑动时间窗口的Flink性能问题

颜霖
2023-03-14

我正在尝试flink的一些网络监控工作。我的目标是计算每个src\u ip的不同dst\u ip

我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。

例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗口过期时,flink只合并2-301的累加器,并销毁第二个2的累加器。等等

这种方法比将事件分配给多个窗口和计算每个窗口的聚合要有效得多。

Flink支持这一点吗?我能自己用flink得到类似的函数吗?

谢谢!

public static class AverageAccumulator2 {
    String key;
    Set<String> target;
    AverageAccumulator2() {
        target = new HashSet<>();
    }
}

public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
    @Override
    public AverageAccumulator2 createAccumulator() {
        return new AverageAccumulator2();
    }

    @Override
    public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
        accumulator.key = value.get("value").get("src_ip").asText();
        accumulator.target.add(value.get("value").get("dst_ip").asText());
        return accumulator;
    }
    @Override
    public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
        return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
    }

    @Override
    public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
        a.target.addAll(b.target);
        return a;
    }
}

final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
                    .timeWindow(Time.seconds(300),Time.seconds(1))
                    .aggregate(new Average2());

共有1个答案

林鹭洋
2023-03-14

正如您所观察到的,Flink不会尝试优化滑动窗口。对于细粒度滑动,这确实会变得非常昂贵。

您可以做的是使用ProcessFunction实现您自己的逻辑来处理状态和定时器——您可以像您概述的那样实现它。您将拥有对于每个传入记录更新用于累积结果的数据结构的过程元素方法,以及每秒触发一次、将部分结果合并在一起并向下游发送结果的onTimer方法。

 类似资料:
  • 我的用例 null 问题 然而,如果我没有理解错的话,这将意味着由于滑动窗口的性质,单个事件将产生7*24*6=1008个记录。所以我的问题是,我如何才能减少纯粹的数额?

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1

  • 我有一个拼花地板数据表,结构如下: null null 我已经调整了以下设置,希望降低总时间: spark.memory.storagefraction 0.02 spark.sql.windowexec.buffer.in.memory.threshold 100000 spark.sql.constraintpropagation.enabled false 第二种方法帮助防止了日志中出现的一

  • 我尝试在Drools 5.4.0中使用滑动时间窗口。最终版本,并提供以下官方文档片段: 和 我认为混合形式是有效的: 但是,除非我弄错了,否则它的行为并不像预期的那样(即只考虑在过去2分钟内发生的RHT股票滴答声)。我不明白结果的逻辑。 有人能给我解释一下这个把戏吗? 谢谢

  • 问题内容: 我们几乎将elasticsearch用作缓存,存储在时间窗口中找到的文档。我们不断插入许多不同大小的文档,然后使用结合日期过滤器的文本查询在ES中进行搜索,因此当前线程不会获取已经看到的文档。像这样: “(((word1 AND word 2)OR(word3 AND word4))AND insertDate> 1389000” 我们使用TTL功能在elasticsearch中将数据