我正在尝试在我的Flink作业中使用事件时间,并使用BoundedOutOfOrdernessTimestampExtractor
来提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得Aggregate Function
中的getResult
根本没有调用。我可以看到数据进入add
函数。
我已经设置了getEnv()。getConfig()。设置自动水印间隔(1000L) 我尝试过
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
.allowedLateness(WINDOW_LATENESS)
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
还有会话窗口
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
所有的水印都显示没有水印,我怎么能让Flink忽略这个没有水印的东西呢?
我也有同样的问题-src可能会长期处于非活动状态
下面的解决方案基于水印稀疏性。
这是一个独立的Flink作业,用于演示该概念。
package com.demo.playground.flink.sleepysrc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class SleepyJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EventGenerator eventGenerator = new EventGenerator();
WatermarkStrategy<Event> strategy = WatermarkStrategy.
<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).
withIdleness(Duration.ofSeconds(Constants.IDLE_TIME_SEC)).
withTimestampAssigner((event, timestamp) -> event.timestamp);
final DataStream<Event> events = env.addSource(eventGenerator).assignTimestampsAndWatermarks(strategy);
KeyedStream<Event, String> eventStringKeyedStream = events.keyBy((Event event) -> event.id);
WindowedStream<Event, String, TimeWindow> windowedStream = eventStringKeyedStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(Constants.SESSION_WINDOW_GAP)));
windowedStream.allowedLateness(Time.milliseconds(1000));
SingleOutputStreamOperator<Object> result = windowedStream.process(new ProcessWindowFunction<Event, Object, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Event> events, Collector<Object> collector) {
int counter = 0;
for (Event e : events) {
Utils.print(++counter + ") inside process: " + e);
}
Utils.print("--- Process Done ----");
}
});
result.print();
env.execute("Sleepy flink src demo");
}
private static class Event {
public Event(String id) {
this.timestamp = System.currentTimeMillis();
this.eventData = "not_important_" + this.timestamp;
this.id = id;
}
@Override
public String toString() {
return "Event{" +
"id=" + id +
", timestamp=" + timestamp +
", eventData='" + eventData + '\'' +
'}';
}
public String id;
public long timestamp;
public String eventData;
}
private static class EventGenerator implements SourceFunction<Event> {
@Override
public void run(SourceContext<Event> ctx) throws Exception {
/**
* Here is the sleepy src - after NUM_OF_EVENTS events are collected , the code goes to a SHORT_SLEEP_TIME sleep
* We would like to detect this inactivity and FIRE the window
*/
int counter = 0;
while (running) {
String id = Long.toString(System.currentTimeMillis());
Utils.print(String.format("Generating %d events with id %s", 2 * Constants.NUM_OF_EVENTS, id));
while (counter < Constants.NUM_OF_EVENTS) {
Event event = new Event(id);
ctx.collect(event);
counter++;
Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
}
// here we create a delay:
// a time of inactivity where
// we would like to FIRE the window
Thread.sleep(Constants.SHORT_SLEEP_TIME);
counter = 0;
while (counter < Constants.NUM_OF_EVENTS) {
Event event = new Event(id);
ctx.collect(event);
counter++;
Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
}
Thread.sleep(Constants.LONG_SLEEP_TIME);
}
}
@Override
public void cancel() {
this.running = false;
}
private volatile boolean running = true;
}
private static final class Constants {
public static final int VERY_SHORT_SLEEP_TIME = 300;
public static final int SHORT_SLEEP_TIME = 8000;
public static final int IDLE_TIME_SEC = 5;
public static final int LONG_SLEEP_TIME = SHORT_SLEEP_TIME * 5;
public static final long SESSION_WINDOW_GAP = 60 * 1000;
public static final int NUM_OF_EVENTS = 4;
}
private static final class Utils {
public static void print(Object obj) {
System.out.println(new java.util.Date() + " > " + obj);
}
}
}
引入了一种新的具有空闲能力的水印。Flink在计算最小值时将忽略这些空闲水印,因此将考虑包含数据的单个分区。https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.html
仅供参考,这通常被称为“空闲源”问题。发生这种情况是因为每当Flink运算符有两个或多个输入时,它的水印是其输入中水印的最小值。如果其中一个输入失速,它的水印不再前进。
请注意,Flink没有每个键的水印——给定的运算符通常会在多个键的事件中多路复用。只要一些事件流经给定任务的输入流,它的水印就会前进,空闲键的事件时间计时器仍然会触发。要发生这种“空闲源”问题,任务必须有一个完全空闲的输入流。
如果可以安排,最好的解决方案是让数据源包含keepalive事件。这将允许您自信地推进水印,知道源只是空闲的,而不是离线的。
如果这是不可能的,并且如果您有一些非空闲的源,那么您可以在BoundedAutofordernessTimestampExtractor前面(在keyBy之前)放置一个重新平衡(),这样每个实例都可以继续接收一些事件并提升其水印。这是以牺牲额外的网络洗牌为代价的。
也许最常用的解决方案是使用水印生成器,该生成器检测空闲状态,并根据处理时间计时器人工推进水印。ProcessingTimeTrailingBoundedAutofordernessTimestampExtractor就是一个例子。
我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?
我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从
我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。
问题陈述:来自kafka源的流式事件。这些事件有效载荷为字符串格式。将它们解析为文档,并根据事件时间每隔5秒将其批量插入DB。 函数正在执行。但程序控制不会进入。因此不会发生批量插入。我尝试了键控和非键控窗口。它们都不工作。没有抛出错误。 flink版本:1.15.0 下面是我的主要方法的代码。我应该如何解决这个问题?
主要内容:1.窗口概述,2.窗口分类,3.细分,4.窗口Api,5.窗口分配器 Window Assigners,6.窗口函数,7.TopN 实例1.窗口概述 Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口