当前位置: 首页 > 知识库问答 >
问题:

Flink timeWindow获取开始时间

束飞捷
2023-03-14

我正在计算一个时间窗口上的计数(求和1),如下所示:

mappedUserTrackingEvent
            .keyBy("videoId", "userId")
            .timeWindow(Time.seconds(30))
            .sum("count")

我还想将窗口开始时间添加为一个关键字段。所以结果会是这样的:

key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50

所以本质上是按窗口聚合计数。最终目标是绘制这些窗口的直方图。

如何将窗口的开头添加为键中的字段?然后,在这种情况下,将窗口对齐到00秒或30秒?这可能吗?

共有2个答案

谭坚诚
2023-03-14

您可以使用方法“聚合”而不是“求和”
在“聚合”中,设置第二个参数实现窗口函数或扩展处理窗口函数
我正在使用Flink-1.4.0,建议使用ProcessWindowFunction,例如:

mappedUserTrackingEvent
    .keyBy("videoId", "userId")
    .timeWindow(Time.seconds(30))
    .aggregate(new Count(), new MyProcessWindowFunction();

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long,  Integer>, Tuple, TimeWindow>
{
    @Override
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long,  Integer>> collector) throws Exception
    {
        context.currentProcessingTime();
        context.window().getStart();
    }
}
阎智
2023-03-14

WindowFunctionApplication()方法提供了一个Window对象,如果您使用keyBy(). timeWindow(),它是一个TimeWindowTimeWindow对象有两个方法,getStart()getEnd(),分别返回窗口开始和结束的时间戳。

目前无法将sum()聚合与WindowFunction一起使用。您需要执行以下操作:

 mappedUserTrackingEvent
        .keyBy("videoId", "userId")
        .timeWindow(Time.seconds(30))
        .apply(new MySumReduceFunction(), new MyWindowFunction());`

MySumReduceFunction实现了ReduceFunction接口,并通过增量聚合到达窗口的元素来计算总和。MyWindowFunction实现了WindowFunction。它通过Iterable参数接收聚合值,并使用从TimeWindow参数获得的时间戳丰富值。

 类似资料:
  • 我想从JavaGUI到数据库获取startTime和endTime的值。 电脑座位班 Cobadatabase类 这里的问题是,当单击登录按钮时,开始时间显示在我的数据库中的开始时间和结束时间列上。当单击注销按钮时,将在数据库中创建另一个行,该行在starTime和endTime列上都包含endTime。我想知道为什么会这样...

  • 问题内容: 我想知道如何检索实例的父结构。 我不知道如何实现这一目标。 例如: 干杯 问题答案: 您应该保留一个指向引擎盖的指针。 当你增加房子 那么您可以轻松更改,尽管此时可能不需要吸气剂。

  • 问题内容: 我有含有天自变量历元基准日期的用于某一日期。 有人知道将该变量转换为a的方法吗? 问题答案: 以下应该工作: 关于时区的注释: 使用程序运行所在系统的默认时区创建一个新实例。由于Epoch是相对于UTC(Java中的GMT)的,因此必须谨慎处理与UTC不同的任何时区。以下程序说明了该问题: 此打印 这表明仅使用eg是不够的。在这种情况下,必须始终考虑到一天中的什么时间。这可以通过显式使

  • 我有一个enum,其值如下:,,等。 我试图编写一个方法,它接受并返回该时间段的开始和结束日期。 我研究了新的Java8类,该类可能需要一个开始时间和结束时间,但似乎没有任何干净的方法可以在之后检索这些值。 如果不使用(似乎是错误的数据结构)或一些难看的日期时间算法,我如何能一次干净地返回开始日期和结束日期?

  • 问题内容: 我在我的应用程序中使用了自定义日历。我已为用户提供了选择一周中第一天的选项,可能是: 我想获取一个月中的星期数-取决于星期几的开始时间,而不是星期几的默认值- 。 码: 问题答案: 尝试 在您的情况下,您可能需要执行以下操作: 并用诸如

  • 问题内容: 如果可能的话,在以下情况下,我希望使用joda或非joda解决方案 假设我的一周从2012年5月2日开始,给定的当前日期为02/22/2011。我需要计算给定当前日期的星期开始和结束日期。因此,我的解决方案的星期应该从02/19开始,而星期在02/25结束。为简单起见,我将我的工作日设置为02/05/2011,但是可能是任何一天,我的工作日始终为7天。 我现有的代码如下,但似乎无法按预