当前位置: 首页 > 知识库问答 >
问题:

利用并行性生成有序窗口聚合(例如,前10个查询)

宁锐
2023-03-14

我正在尝试利用并行性来加速一个前10位的窗口操作。我的应用程序由具有时间戳和键和(即tuple2 )的事件组成,我的目标是为30分钟的滚动窗口生成前10个最频繁的键(使用事件时间)。为此,我的查询由一个入口、一个窗口和一个聚合阶段组成。换句话说,我的代码将需要执行类似以下内容的操作:

DataStream<Tuple3<Long, String, Integer>> s = env
    .readTextFile("data.csv")
    .map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
      @Override
      public Tuple3<Long, String, Integer> map(String s) throws Exception {
        String[] tokens = s.split(",");
        return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
            tokens[1], 1);
      }})
    .assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
          @Override
          public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
            return t.f0;
          }}).setParallelism(1);

以上是从一个CSV文件解析数据并分配事件时间(即入口)的代码。我将并行性设置为1的原因是,我需要事件按顺序显示,以便将它们分配给Windows。

接下来是棘手的部分,我试图在生成正确的(有序的)窗口结果的同时加快执行速度。

朴素(串行)执行

DataStream<Tuple2<Long, String>> windowedTopTen = s
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(30)))
        .apply(new SerialAggregation()).setParallelism(1);
DataStream<Tuple2<Long, String>> windowedTopTen = s
    .keyBy(1)
    .window(TumblingEventTimeWindows.of(Time.minutes(30)))
    .apply(new PartialAggregation()).setParallelism(N);

其中partialaggregation()将为不同的时间戳生成部分结果(不相交的键集)。换句话说,我的理解是,对于相同的时间戳t1,我将用partial_result_1partial_result_n,其中n是我设置的并行性。我的目标是聚合特定时间戳(如t1)的所有部分结果,但我不知道如何做到这一点。此外,当我能够将部分结果与匹配的时间戳组合在一起时,我将如何生成一个数据流,其元组是基于时间戳排序的(就像朴素解决方案产生的结果一样)。

问题

  1. 如何完成并行(更快)方法以产生所需结果并将部分结果与匹配的时间戳结合?
  2. 在我将每个时间戳的部分结果合并后,是否有方法生成一个数据流,其中结果按时间戳的顺序显示?

共有1个答案

甄永年
2023-03-14

首先,如果将Tuple2替换为Tuple3,其中字符串是单个键,整数是计数器,那么将会更容易将部分前10名结果组合为整体前10名。

然后,您可以使用windowAll和一个聚合窗口函数,在窗口的第二层中添加前10个键(总的)及其计数。

 类似资料:
  • 尝试合并多个 Kafka 流,聚合

  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?

  • 当我现在运行我的脚本时,我得到了两个独立的窗口。一个窗口,让我们称之为“窗口A”,我的文本和输入框和一个空窗口,让我们称之为“窗口B”。 当我在“窗口A”中单击“运行”时,我的phyton脚本(在本例中为TennisMatchProbability.py)将被触发,该脚本的结果(TennisMatchProbability.py)将显示在“窗口B”中。 这是“tennismatchprobabil

  • 我们有一个数据流,其中每个元素都属于这种类型: 我们希望聚合此流并每周输出一次的总和。 当前解决方案: flink管道的示例如下所示: 输入 如果窗口在记录和之间结束,我们的输出将是: Id和仍将在flink管道中,并将在下周输出。 因此,下周我们的总产量将是: 新规定: 我们现在还想知道每个记录在哪一周被处理。换句话说,我们的新产出应该是: 但我们还需要这样的额外输出: 这个怎么处理? Flin

  • 与group by/join相比,我对在窗口上运行聚合函数的性能特征感兴趣。在本例中,我对具有自定义帧边界或顺序的窗口函数不感兴趣,而只是作为运行聚合函数的一种方式。 请注意,我只对大小适中的数据量的批处理(非流式)性能感兴趣,因此我禁用了以下广播连接。 例如,假设我们从以下DataFrame开始: 假设我们想要计算每个名称出现的次数,然后为具有匹配名称的行提供该计数。 根据执行计划,窗口化看起来

  • 我有一个用例,其中我收到包含不同信息集的事件流,并希望对它们执行聚合。对于这些聚合中的每一个,都需要多个翻滚窗口,例如:每日,每周,每月,每年等。 聚合最初是所看到的计数的基本加法,但后来可能是对这些事件的一些分析/联接处理。因此,如果一个事件A每天来一次,另一个事件B每周来一次,结果将是这样的: 用例只是围绕翻滚的窗口而不是滑动窗口,我正在研究如何实现这个用例。主要问题是我不想等到窗口结束,而是