如何在ApacheFlink中为会话窗口分配id?
最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。
我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态!
会话窗口ID将是落入窗口的第一个事件的时间戳(由于非保证排序,这可能意味着一些事件可能会落入具有较早时间戳的同一会话窗口-我对此没意见)。
public class FooSessionState {
private Long sessionCreationTime;
private FooMatch lastMatch;
}
/**
* Aggregator that assigns session ids to elements of a session window
*/
public class SessionIdAssigner implements
AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {
static final long serialVersionUID = 0L;
@Override
public FooSessionState createAccumulator() {
return new FooSessionState();
}
@Override
public FooSessionState add(FooMatch value, FooSessionState sessionState) {
if (sessionState.getSessionCreationTime() == null) {
sessionState.setSessionCreationTime(value.getReport().getTimestamp());
}
sessionState.setLastMatch(value);
return sessionState;
}
@Override
public FooSessionEvent getResult(FooSessionState accumulator) {
FooSessionEvent sessionEvent = new FooSessionEvent();
sessionEvent.setFooMatch(accumulator.getLastMatch());
sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
return sessionEvent;
}
@Override
public FooSessionState merge(FooSessionState a, FooSessionState b) {
if ( a.getSessionCreationTime() != null) {
b.setSessionCreationTime(a.getSessionCreationTime());
}
return b;
}
}
我的计划是按如下方式使用它:
stream.keyBy(new FooMatchKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
.trigger(PurgingTrigger.of(CountTrigger.of(1L)))
.aggregate(new SessionIdAssigner())
我认为会话窗口不适合您想要实现的目标。它们被设计为每个会话聚合事件,但不是丰富每个事件,也就是说,它们计算结果并在窗口关闭时发出结果。正如您所注意到的,会话窗口通过为每个事件创建一个新窗口并合并重叠的窗口来工作。之所以选择这种设计,是因为事件可能会无序到达。因此,可能会出现这样的情况:您有两个窗口,稍后通过桥接事件连接。
我建议使用一个收集事件并对其时间戳进行排序的ProcessFunction
来实现逻辑。当接收到水印时,它会发出带有正确会话ID的所有收集到的事件。因此,您只保留两个水印之间的事件状态。除了这些事件之外,您还需要保留最后发出的事件的时间戳和最后发出的会话ID来执行正确的会话化。
有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====
你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点
我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?
我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?
从flink办公室引入会话窗口 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-窗口。。。会话窗口操作符为每个到达的记录创建一个新窗口,如果窗口之间的距离比定义的间隙更近,则将窗口合并在一起。为了可合并,会话窗口操作符需要合并触发器和合并窗口函数,。。。
我有一个由两个字段“键控”的记录流,然后分配一个间隔为30秒的会话窗口。我使用附加在记录上的“时间戳”作为事件时间。我正在使用“Assign AscendingTimeStamps”水印。 以下面的记录为例。该流由(用户,place)键控。 Record1:user1,place1,timestamp t1 Record2:user2,place1,timestamp在t1之后30秒 桶1 Rec