我有一个用例,我想我需要一些关于如何处理它的帮助。因为我是流媒体和Flink的新手,所以我会尽量描述我想要实现的目标。对不起,如果我没有使用正式和正确的语言。
我的代码将用java编写,但我不想用python编写代码,也不想用伪代码或方法编写代码。
TL: DR
背景:
我想做的是:
我的psuedo解决方案:
问题-当新事件发生时,需要:
问题是:
那有可能吗?
换句话说,我只在两个“连续”事件之间建立联系。
非常感谢你。
也许展示**BATCH案例的解决方案会显示我正在努力做到最好:**
for i in range(grouped_events.length):
event_A = grouped_events[i]
event_B = grouped_events[i+1]
if event_B.get("time") - event_A.get("time") < 30:
if event_B.get("color") == event_A.get("color"):
if event_B.get("size") > event_A.get("size"):
create_result_event(event_A, event_B)
我的(天真的)尝试到目前为止与Flink在java
**求和函数只是我的函数创建新结果对象的占位符。。。
>
是否尝试在第二个窗口上运行一些事件并对所有事件进行迭代?
DataStream
.keyBy(threeEvent -> threeEvent.getUserId())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.sum("size")
.print();
DataStream
.keyBy(threeEvent -> threeEvent.getUserId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new processFunction());
public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
Float sumOfSize = 0F;
for (ThreeEvent f : threeEvents) {
sumOfSize += f.getSize();
}
out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
}
}
当然,您可以使用windows创建排序和分析的小批次,但很难正确处理窗口边界(如果应该配对的事件位于不同的窗口中会怎么样?)。
这看起来更容易通过键控流和有状态平面图来完成。只需使用RichFlatMapFunction,并使用一段键控状态(ValueState),它记住每个键的前一个事件。然后,在处理每个事件时,将其与保存的事件进行比较,如果发生这种情况,则生成结果,并更新状态。
您可以在flink训练和flink留档中阅读有关使用flink键控状态的内容。
关于你的用例,我关心的一件事是你的事件是否会乱序到达。是否需要首先按时间戳对事件进行排序才能获得正确的结果?这不是微不足道的。如果这是一个问题,那么我建议您使用FlinkSQLMATCH_RECOGNIZE或CEP库,这两个库都是为对事件流进行模式识别而设计的,并且会为您处理流的html" target="_blank">排序(您只需必须提供时间戳和水印)。
此查询可能并不完全正确,但希望能够传达如何使用match Recognite执行类似操作的风格:
SELECT * FROM Events
MATCH_RECOGNIZE (
PARTITION BY userId
ORDER BY eventTime
MEASURES
A.userId as userId,
A.color as color,
A.size as aSize,
B.size as bSize
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B)
DEFINE
A AS true,
B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30)
AND A.color = B.color
AND A.size < B.size )
);
这也可以在CEP中非常自然地完成,其中比较连续事件的基础是使用迭代条件,您可以在子句中使用来处理时间约束。
Flink在这里提供了一个示例:https://www.ververica.com/blog/stream-processing-introduction-event-time-apache-flink这描述了这样一个场景:有人在玩游戏,由于subway而失去连接,然后当他重新联机时,所有数据都恢复了,可以进行排序和处理。 我的理解是,如果有更多的球员,有两种选择: > 所有其他的将被延迟,等待该
我正在寻找一种可以限制当前正在处理的数据量的选项。 用例:我正在从Kafka数据流中读取并处理该数据,我想限制正在运行的消息数量。这样做的原因是第三方应用程序的吞吐量。通常这不是问题,但在背压的场景中,由于这些故障,经常会出现故障和应用程序重新启动。
我试图使用process函数对一组事件进行处理。我正在使用事件时间和键控流。我面临的问题是,水印值始终为9223372036854725808。我已经对print语句进行了调试,它显示如下: 时间戳------1583128014000提取时间戳1583128014000当前水印------9223372036854775808 时间戳------1583128048000提取时间戳1583128
总之,我希望flatMap1()和flatMap2()按照我在事件中设置的时间戳的顺序被调用。但那不是真的。
我有一些能量计,将继续产生计数器值,这是一个累积指标。即不断增加,直到计数器复位。 有一个实时ETL作业,它在事件时间的两个连续值之间进行减法。 例如。 此外,有时事件可能没有按顺序接收。 如何使用Apache Flink流式API实现?最好使用Java中的示例。
null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有