当前位置: 首页 > 知识库问答 >
问题:

Flink窗口函数getResult未触发

萧嘉禧
2023-03-14

我正在尝试在我的Flink作业中使用事件时间,并使用BoundedOutOfOrdernessTimestampExtractor来提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得Aggregate Function中的getResult根本没有调用。我可以看到数据进入add函数。

我已经设置了getEnv()。getConfig()。设置自动水印间隔(1000L) 我尝试过

 eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
            .allowedLateness(WINDOW_LATENESS)
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

还有会话窗口

eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

所有的水印都显示没有水印,我怎么能让Flink忽略这个没有水印的东西呢?

共有3个答案

韦宏朗
2023-03-14

我也有同样的问题-src可能会长期处于非活动状态
下面的解决方案基于水印稀疏性。

这是一个独立的Flink作业,用于演示该概念。

package com.demo.playground.flink.sleepysrc;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;


public class SleepyJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final EventGenerator eventGenerator = new EventGenerator();
        WatermarkStrategy<Event> strategy = WatermarkStrategy.
                <Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).
                withIdleness(Duration.ofSeconds(Constants.IDLE_TIME_SEC)).
                withTimestampAssigner((event, timestamp) -> event.timestamp);
        final DataStream<Event> events = env.addSource(eventGenerator).assignTimestampsAndWatermarks(strategy);
        KeyedStream<Event, String> eventStringKeyedStream = events.keyBy((Event event) -> event.id);
        WindowedStream<Event, String, TimeWindow> windowedStream = eventStringKeyedStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(Constants.SESSION_WINDOW_GAP)));
        windowedStream.allowedLateness(Time.milliseconds(1000));
        SingleOutputStreamOperator<Object> result = windowedStream.process(new ProcessWindowFunction<Event, Object, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<Event> events, Collector<Object> collector) {
                int counter = 0;
                for (Event e : events) {
                    Utils.print(++counter + ") inside process: " + e);
                }
                Utils.print("--- Process Done ----");
            }
        });
        result.print();
        env.execute("Sleepy flink src demo");
    }


    private static class Event {
        public Event(String id) {
            this.timestamp = System.currentTimeMillis();
            this.eventData = "not_important_" + this.timestamp;
            this.id = id;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "id=" + id +
                    ", timestamp=" + timestamp +
                    ", eventData='" + eventData + '\'' +
                    '}';
        }

        public String id;
        public long timestamp;
        public String eventData;
    }

    private static class EventGenerator implements SourceFunction<Event> {

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            /**
             *  Here is the sleepy src - after NUM_OF_EVENTS events are collected , the code goes to a SHORT_SLEEP_TIME sleep
             *  We would like to detect this inactivity and FIRE the window
             */
            int counter = 0;
            while (running) {
                String id = Long.toString(System.currentTimeMillis());
                Utils.print(String.format("Generating %d events with id %s", 2 * Constants.NUM_OF_EVENTS, id));
                while (counter < Constants.NUM_OF_EVENTS) {
                    Event event = new Event(id);
                    ctx.collect(event);
                    counter++;
                    Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
                }
                // here we create a delay:
                // a time of inactivity where
                // we would like to FIRE the window
                Thread.sleep(Constants.SHORT_SLEEP_TIME);
                counter = 0;
                while (counter < Constants.NUM_OF_EVENTS) {
                    Event event = new Event(id);
                    ctx.collect(event);
                    counter++;
                    Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
                }
                Thread.sleep(Constants.LONG_SLEEP_TIME);
            }
        }

        @Override
        public void cancel() {
            this.running = false;
        }

        private volatile boolean running = true;

    }

    private static final class Constants {
        public static final int VERY_SHORT_SLEEP_TIME = 300;
        public static final int SHORT_SLEEP_TIME = 8000;
        public static final int IDLE_TIME_SEC = 5;
        public static final int LONG_SLEEP_TIME = SHORT_SLEEP_TIME * 5;
        public static final long SESSION_WINDOW_GAP = 60 * 1000;
        public static final int NUM_OF_EVENTS = 4;
    }

    private static final class Utils {
        public static void print(Object obj) {
            System.out.println(new java.util.Date() + " > " + obj);
        }
    }
}
南宫凯康
2023-03-14

引入了一种新的具有空闲能力的水印。Flink在计算最小值时将忽略这些空闲水印,因此将考虑包含数据的单个分区。https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.html

东方英豪
2023-03-14

仅供参考,这通常被称为“空闲源”问题。发生这种情况是因为每当Flink运算符有两个或多个输入时,它的水印是其输入中水印的最小值。如果其中一个输入失速,它的水印不再前进。

请注意,Flink没有每个键的水印——给定的运算符通常会在多个键的事件中多路复用。只要一些事件流经给定任务的输入流,它的水印就会前进,空闲键的事件时间计时器仍然会触发。要发生这种“空闲源”问题,任务必须有一个完全空闲的输入流。

如果可以安排,最好的解决方案是让数据源包含keepalive事件。这将允许您自信地推进水印,知道源只是空闲的,而不是离线的。

如果这是不可能的,并且如果您有一些非空闲的源,那么您可以在BoundedAutofordernessTimestampExtractor前面(在keyBy之前)放置一个重新平衡(),这样每个实例都可以继续接收一些事件并提升其水印。这是以牺牲额外的网络洗牌为代价的。

也许最常用的解决方案是使用水印生成器,该生成器检测空闲状态,并根据处理时间计时器人工推进水印。ProcessingTimeTrailingBoundedAutofordernessTimestampExtractor就是一个例子。

 类似资料:
  • 我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?

  • 我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从

  • 我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。

  • 问题陈述:来自kafka源的流式事件。这些事件有效载荷为字符串格式。将它们解析为文档,并根据事件时间每隔5秒将其批量插入DB。 函数正在执行。但程序控制不会进入。因此不会发生批量插入。我尝试了键控和非键控窗口。它们都不工作。没有抛出错误。 flink版本:1.15.0 下面是我的主要方法的代码。我应该如何解决这个问题?

  • 主要内容:1.窗口概述,2.窗口分类,3.细分,4.窗口Api,5.窗口分配器 Window Assigners,6.窗口函数,7.TopN 实例1.窗口概述 Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口