当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:窗口还原函数永远不会执行

须景胜
2023-03-14

下面是代码片段,我在其中使用了基于翻滚事件时间的窗口

DataStream<OHLC> ohlcStream = stockStream.assignTimestampsAndWatermarks(new TimestampExtractor()).map(new mapStockToOhlc()).keyBy((KeySelector<OHLC, Long>) o -> o.getMinuteKey())
        .timeWindow(Time.seconds(60))
        .reduce(new myAggFunction());

不幸的是,它似乎从未执行过reduce函数。如果使用上面的代码进行窗口处理,reduce函数可以正常工作。下面是时间戳提取器的代码。30秒水印延迟仅用作测试值,但一分钟翻转窗口为m

    public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<StockTrade> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - 30000);
    }

    @Override
    public long extractTimestamp(StockTrade stockTrade, long l) {
        BigDecimal bd = new BigDecimal(stockTrade.getTime());
        // bd contains miliseconds timestamp 1498658629.036
        return bd.longValue();
    }
}

bd.longValue(),它返回秒时间戳1498658629,因为我的窗口也是以秒为单位定义的。< br >当我使用返回分钟时间戳的bd.longValue()/60时,调用了reduce函数。我的输出文件包含每个reduce操作的所有记录

{time=1498717692.000, minuteTime=24978628, n=1, open=2248.0}
{time=1498717692.000, minuteTime=24978628, n=2, open=2248.0}
...
{time=1498717692.000, minuteTime=24978628, n=8, open=2248.0}

那么,任何人都可以向我解释,发生了什么事吗?很多。

共有1个答案

程俊力
2023-03-14

通常,水印应相对于数据中的时间戳,而不应基于系统时钟。使用事件时间的一大好处是,同一个应用程序可用于重新处理历史数据或处理当前数据,但是,如果您将时间戳与系统时钟进行比较,则这是不可能的,就像您在这里所做的那样。

水印可以被认为是一个语句,表明时间戳小于水印的所有数据都已到达。换句话说,时间戳小于当前水位线的任何数据都将被视为延迟。我的猜测是,您没有看到任何结果,因为您的水印导致所有数据都被视为延迟,并且窗口操作员正在删除所有这些延迟数据。

我建议你改用 BoundedOutOfOrdernessTimestampExtractor。它的工作原理是跟踪到目前为止在数据流中看到的最大时间戳,并从该最大时间戳(而不是系统时钟)中减去延迟。源代码,以防您好奇。

 类似资料:
  • 问题内容: 我正在使用Flask-WTF: 这是我的表格: 这是控制器: 现在的问题是,如果您查看我的打印语句,它将始终打印已提交的内容,但从不打印有效的内容,并且永远不会执行validate_on_submit()。为什么? 问题答案: 您没有在HTML表单中插入CSRF字段。 添加到模板(docs)后,表单将按预期验证。 验证表单后添加,以查看出现的错误。 在验证之前将为空。在这种情况下,会出

  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 问题内容: 该程序的目的是单击以创建圆,ballball类扩展了test1,当test1检测到鼠标单击时,即创建了ballball对象。但是paint / paintComponent方法永远不会执行。在我的程序结构中,是否可以将圆圈绘制到超类JPanel? 问题答案: 不是,它没有可以覆盖的方法。相反,您可以扩展a 并将其添加到框架中。

  • 我有这份工作: 作业应该每小时运行一次。我以为问题是cronexpression。这就是为什么我把它改成上面的表达式。在我有这个表达之前: null

  • 在嵌入式框架内使用JComboBox时,我确实有一个奇怪的焦点问题。 我创建了一个新的Shell 我用来创建嵌入式框架的外壳,如 我添加了一个JComboBox(有2个元素)到rootGroup。当我选择ComboBox打开下拉菜单时,我可以拖放shell窗口,而不会丢失组合框中的焦点。 当外壳窗口移动到另一个屏幕区域时,下拉菜单仍然在屏幕上的同一位置。下拉菜单不再显示在组合框下面。 我试图为某些

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?