当前位置: 首页 > 知识库问答 >
问题:

Apache Flink,窗口流的第二阶段求和

左劲
2023-03-14

此部分工作:

val stream = env
    // words is our Kafka topic
    .addSource(kafkaConsumer)
    // configure timestamp and watermark assigner
    .assignTimestampsAndWatermarks(new DeviceTSAssigner)
      .keyBy(_.deviceIdFull)
      .timeWindow(Time.minutes(5), Time.minutes(1))
    /* count events in window */
      .apply{ (key: String, window: TimeWindow, events: Iterable[DeviceData], out: Collector[(String, Long, Double)]) =>
        out.collect( (key, window.getEnd, events.map(_.currentReading).sum/events.size))
    }

  stream.print()

输出类似于

(device1,1530681420000,0.0)
(device2,1530681420000,0.0)
(device3,1530681480000,0.0)
(device4,1530681480000,0.0)
(device5,1530681480000,52066.0)
(device6,1530681480000,69039.0)
(device7,1530681480000,79939.0)
... 
...

下面的代码是我遇到问题的部分,我不确定具体如何编码,但我认为应该是这样的:

  val avgStream = stream
    .keyBy(2) // 2 represents the window.End from stream, see code above
    .timeWindow(Time.minutes(1)) // tumbling window
    .apply { (
               key: Long,
               window: TimeWindow,
               events: Iterable[(String, Long, Double)],
               out: Collector[(Long, Double)]) =>
      out.collect( (key, events.map( _._3 ).sum ))
    }
Error:(70, 52) type mismatch;
 found   : (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[(Long, Double)]) => Unit
 required: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[?]) => Unit
                   out: Collector[(Long, Double)]) =>
val avgStream = stream
          .map(r => (r._2, r._3))
         .keyBy(0)
         .timeWindowAll(Time.minutes(1))
         .sum(1)
         .print()

共有1个答案

邓阳嘉
2023-03-14

我能够回答我自己的问题,所以上面描述的方法(参见更新07/04/2018)是有效的,但是更好的方法(特别是如果您想对流中的一个字段而是多个字段)是使用AggregateFunction。我之前也尝试过,但遇到了问题,因为缺少了“地图”步骤。

一旦我在第二阶段映射了流以提取出感兴趣的相关字段,我就可以使用AggregateFunction了。

这里的Flink文档和这个github链接都为此提供了一个示例。我从Flink文档示例开始,因为它非常容易理解,然后将代码转换成更像github示例的代码。

 类似资料:
  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 我正在JavaFX 2.0中制作一个应用程序。从我的主窗口开始一个带有一些设置的新窗口。调整完设置后,我想按一个像“保存更改”这样的按钮。 我想用这个按钮保存更改并关闭窗口。我所说的关闭意味着杀死它,而不是把它放在背景中或设置可见性。我读过关于方法阶段的书。关闭() http://docs.oracle.com/javafx/2.0/api/javafx/stage/Stage.html 正如您所

  • 我有一扇没有装饰的窗户: 我想知道我怎样才能把它做成一扇可拖动的未装饰窗户?我想在用户用鼠标右键选择窗口时更改其位置,然后在按住鼠标键的同时移动鼠标。 附言:我测试了这个解决方案,但它不起作用:

  • 所以我一直在为openGL课程玩土壤,我的项目遇到了一个奇怪的问题。我将显示两个窗口,每个窗口包含三个视口,在其中绘制一个带纹理的立方体或椭圆体,以展示我对深度和面剔除的理解。现在,所有形状都正确绘制,并且当单独测试时,它看起来完全符合预期,但是一旦我同时启用两个窗口,纹理就会在第一个窗口中禁用。我之前已经发布了类似的问题,所以只是为了澄清,如果我禁用窗口二,或者如果我在窗口二中使用窗口一绘制功能

  • 如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的