当前位置: 首页 > 知识库问答 >
问题:

如何在ApacheFlink中为会话窗口分配id?

叶煌
2023-03-14

如何在ApacheFlink中为会话窗口分配id?

最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。

我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态!

会话窗口ID将是落入窗口的第一个事件的时间戳(由于非保证排序,这可能意味着一些事件可能会落入具有较早时间戳的同一会话窗口-我对此没意见)。

public class FooSessionState {

  private Long sessionCreationTime;

  private FooMatch lastMatch;
}

/**
 * Aggregator that assigns session ids to elements of a session window
 */
public class SessionIdAssigner implements
    AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {

  static final long serialVersionUID = 0L;

  @Override
  public FooSessionState createAccumulator() {
    return new FooSessionState();
  }

  @Override
  public FooSessionState add(FooMatch value, FooSessionState sessionState) {
    if (sessionState.getSessionCreationTime() == null) {
      sessionState.setSessionCreationTime(value.getReport().getTimestamp());
    }
    sessionState.setLastMatch(value);
    return sessionState;
  }

  @Override
  public FooSessionEvent getResult(FooSessionState accumulator) {
    FooSessionEvent sessionEvent = new FooSessionEvent();
    sessionEvent.setFooMatch(accumulator.getLastMatch());
    sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
    return sessionEvent;

  }

  @Override
  public FooSessionState merge(FooSessionState a, FooSessionState b) {

    if ( a.getSessionCreationTime() != null) {
      b.setSessionCreationTime(a.getSessionCreationTime());
    }

    return b;
  }
}

我的计划是按如下方式使用它:

stream.keyBy(new FooMatchKeySelector())
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
    .trigger(PurgingTrigger.of(CountTrigger.of(1L)))
    .aggregate(new SessionIdAssigner())

共有1个答案

全流觞
2023-03-14

我认为会话窗口不适合您想要实现的目标。它们被设计为每个会话聚合事件,但不是丰富每个事件,也就是说,它们计算结果并在窗口关闭时发出结果。正如您所注意到的,会话窗口通过为每个事件创建一个新窗口并合并重叠的窗口来工作。之所以选择这种设计,是因为事件可能会无序到达。因此,可能会出现这样的情况:您有两个窗口,稍后通过桥接事件连接。

我建议使用一个收集事件并对其时间戳进行排序的ProcessFunction来实现逻辑。当接收到水印时,它会发出带有正确会话ID的所有收集到的事件。因此,您只保留两个水印之间的事件状态。除了这些事件之外,您还需要保留最后发出的事件的时间戳和最后发出的会话ID来执行正确的会话化。

 类似资料:
  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • 从flink办公室引入会话窗口 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-窗口。。。会话窗口操作符为每个到达的记录创建一个新窗口,如果窗口之间的距离比定义的间隙更近,则将窗口合并在一起。为了可合并,会话窗口操作符需要合并触发器和合并窗口函数,。。。

  • 我有一个由两个字段“键控”的记录流,然后分配一个间隔为30秒的会话窗口。我使用附加在记录上的“时间戳”作为事件时间。我正在使用“Assign AscendingTimeStamps”水印。 以下面的记录为例。该流由(用户,place)键控。 Record1:user1,place1,timestamp t1 Record2:user2,place1,timestamp在t1之后30秒 桶1 Rec