当前位置: 首页 > 知识库问答 >
问题:

为什么flink不删除延迟数据?

萧阳波
2023-03-14

我在计算一个简单蒸汽的最大值,结果是:

(S11000,S1,值:999)

(S12000,S1,值:41)

最后一行数据明显迟到了:new SensorReading("S1",999,100L)

为什么按第一个窗口(0-1000)计算?

我认为第一个窗口应该在SensorReading("S1",41,1000L)到达时触发。

对于这个结果,我很疑惑。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(TrainingBase.parallelism);

        DataStream<SensorReading> input = env.fromElements(
                new SensorReading("S1", 35, 500L),
                new SensorReading("S1", 42, 999L),
                new SensorReading("S1", 41, 1000L),
                new SensorReading("S1", 40, 1200L),
                new SensorReading("S1", 23, 1400L),
                new SensorReading("S1", 999, 100L)
        );


        input.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
            private long currentMaxTimestamp;

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp);
            }

            @Override
            public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
                currentMaxTimestamp = element.ts;
                return currentMaxTimestamp;
            }
        })
                .keyBy((KeySelector<SensorReading, String>) value -> value.sensorName)
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .reduce(new MyReducingMax(), new MyWindowFunction())
                .print();

        env.execute();

MyReductingMax(),MyWindowFunction()

private static class MyReducingMax implements ReduceFunction<SensorReading> {
        public SensorReading reduce(SensorReading r1, SensorReading r2) {
            return r1.getValue() > r2.getValue() ? r1 : r2;
        }
    }

private static class MyWindowFunction extends
            ProcessWindowFunction<SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

        @Override
        public void process(
                String key,
                Context context,
                Iterable<SensorReading> maxReading,
                Collector<Tuple3<String, Long, SensorReading>> out) {

            SensorReading max = maxReading.iterator().next();
            out.collect(new Tuple3<>(key, context.window().getEnd(), max));
        }
    }

    public static class SensorReading {
        String sensorName;
        int value;
        Long ts;

        public SensorReading() {
        }

        public SensorReading(String sensorName, int value, Long ts) {
            this.sensorName = sensorName;
            this.value = value;
            this.ts = ts;
        }

        public Long getTs() {
            return ts;
        }

        public void setTs(Long ts) {
            this.ts = ts;
        }

        public String getSensorName() {
            return sensorName;
        }

        public void setSensorName(String sensorName) {
            this.sensorName = sensorName;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public String toString() {

            return this.sensorName + "(" + this.ts + ") value: " + this.value;
        }

        ;
    }

共有1个答案

司徒光霁
2023-03-14

一个Assignerwith周期水印不会在每一个可能的机会创建水印。相反,Flink会定期调用这样的分配者来获取最新的水印,默认情况下,这是每200毫秒(实时,而不是事件时间)完成一次。此间隔由ExecutionConfig.setAutoWatermark Interval(...)控制。

这意味着几乎可以肯定的是,在调用水印赋值器之前,您的所有六个测试事件都已被处理。

如果您关心更可预测的水印,您可以使用AssignerSusPunctuatedWatermark代替。

顺便说一句,按照水印赋值器的编写方式,所有无序事件都可能延迟。更典型的做法是使用BoundedAutofordernessTimestampExtractor,允许出现一些无序情况。

 类似资料:
  • 我很难理解水印和允许迟到的概念。 以下是[邮件存档]的摘录|https://www.mail-archive.com/user@Flink。阿帕奇。组织/msg08758。html]这谈到了水印,但我还有几个问题。以下是引用的示例: 假设您有一个,具有2分钟的绑定和10分钟的翻转窗口,从12:00开始到12:10结束: 如果您具有以下流序列: 不允许迟到 当窗口操作符接收到<代码> 允许迟到3分钟

  • 问题内容: 我正在编写一个小的脚本,该脚本在页面加载时将CSS子类分配给三个元素。800ms之后,我希望它删除该子类。 我认为这段代码可以做到: 遗憾的是,任何帮助将不胜感激。提前致谢。 问题答案: 您可以使用 功能: 在指定的延迟后调用函数或执行代码段。

  • 在vertx web backpressure示例中,假设我作为标准verticle启动服务器verticle,那么observeOn(RxHelper.scheduler(vertx.getDelegate())会做什么。 我在一台8核机器上部署了8个事件循环线程和8个Server verticle实例,我没有在路由的处理程序中阻止IO调用

  • 如何在允许的延迟期结束之前“清除”窗口元数据(WindowOperator和InternalTimer)? 是否可以将此元数据与窗口数据本身一起清除? 我们不介意丢失元数据——不需要根据之前非延迟数据的上下文来处理具有相同关键时间的延迟事件。 一些背景知识- [目前正在使用Flink-v1.6]我们正在处理事件时间窗口,并处理大量具有唯一键的事件。95%的活动不会迟到,只会开火一次。 我们的工作规

  • 我读过几篇关于Flink的文章,在读Flink的博客时,我遇到了这样一句话:“最多延迟60秒(事件最多延迟1分钟)” 是否在Flink中定义乱序事件持续时间用于技术“水印”,如果不是,那么内部目的是什么?

  • 问题内容: 我试图在收到请求后立即发送页面响应,然后进行处理,但是我发现响应即使按代码顺序排列也没有“首先”发送。在现实生活中,我有一个页面可供上传一个Excel工作表,该工作表保存到数据库中需要花费时间(50000+行),并且希望更新用户进度。这是一个简化的示例;(取决于您有多少RAM,您可能需要添加几个零来计数才能看到结果) 问题答案: HTTP协议的原始概念是一个简单的请求- 响应服务器-客