当前位置: 首页 > 知识库问答 >
问题:

Apache Flink翻滚窗口时间偏移与表API或SQL

车胤运
2023-03-14

任何人都知道如何使用时间偏移进行翻滚窗口-窗口大小为一天,时间偏移基于时区以小时为单位。

我找到了使用DataStream API执行此操作的示例,想知道如何使用Table API/SQL实现它。

下面是我使用DataStream API的代码。

DataStream<Tuple2<String, Timestamp>> inputStreamWithTime = inputStream
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Timestamp>>() {
    @Override
    public long extractTimestamp(Tuple2<String, Timestamp> element, long previousElementTimestamp) {
        return element.f1.getTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, Timestamp> lastElement, long extractedTimestamp) {
        return new Watermark(extractedTimestamp);
    }
});

inputStreamWithTime
.keyBy(new KeySelector<Tuple2<String,Timestamp>, String>() {
    @Override
    public String getKey(Tuple2<String, Timestamp> in) throws Exception {
        return in.f0;
    }
})
.window(TumblingEventTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
.aggregate(new CountAggregate(), new ProcessTumblingWindowFunction())
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
    return new Tuple3<String, Long, Timestamp, Timestamp>(value.f0, value.f1, value.f2);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP))
.addSink(getSink());

提前谢谢。

共有1个答案

越学博
2023-03-14

不幸的是,无法在表API/SQL中执行该窗口。目前,时间窗口始终在UTC中以该API级别定义。

一种可能的解决方法是改变源连接器中的时间,以便UTC窗口提供正确的结果。但是,您需要在接收器连接器中反向移动它。当然,只有在您不在另一个应用程序中使用源代码的情况下,这种黑客才能起作用。

 类似资料:
  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 我有一个使用flink应用程序的场景,该应用程序接收以下格式的数据流: {“event\u id”:“c1s2s34”,“event\u create\u timestamp”:“2019-03-07 11:11:23”,“amount”:“104.67”} 我使用下面的滚动窗口来查找过去60秒内输入流的总和、计数和平均值。 键值。时间窗口(时间秒(60)) 然而,我如何标记聚合结果,以便我可以说

  • 我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?

  • 使用翻滚窗口的apache flink应用程序遇到问题。窗口大小是10秒,我希望每隔10秒有一个resultSet数据流。然而,当最新窗口的结果集总是延迟时,除非我将更多数据推送到源流。 例如,如果我在“01:33:40.0”和“01:34:00.0”之间将多条记录推送到源流,然后停止查看日志,则不会发生任何事情。 我在“01:37:XX”上再次推送一些数据,然后将在“01:33:40.0”和“0

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想