当前位置: 首页 > 知识库问答 >
问题:

对流的联合进行排序以识别Apache Flink中的用户会话

陆飞鸿
2023-03-14

我有两个事件流

  • L=(l1, l3, l8,...)-更稀疏,表示用户登录IP
  • E=(e2, e4, e5, e9,...)-是特定IP的日志流

较低的索引表示时间戳...如果我们将两个流连接在一起并按时间对它们进行排序,我们将得到:

  • l1, e2, l3, e4, e5, l8, e9,...

能否实现自定义的< code > Window /< code > Trigger 函数来将事件分组到会话(不同用户登录之间的时间):

    < li>l1 - l3 : e2 < li>l3 - l8 : e4,e5 < li>l8 - l14 : e9,e10,e11,e12,e13 <李>...

我看到的问题是,这两个流不一定是排序的。我考虑过按时间戳对输入流进行排序。然后,使用全局窗口和自定义触发器可以很容易地实现窗口-但这似乎是不可能的。

在当前的Flink(v1.3.2)中,我是否遗漏了一些东西,或者绝对不可能这样做?

谢啦

共有1个答案

邵耀
2023-03-14

问:E3不应该在L4之前吗?

使用< code>ProcessFunction排序非常简单。大概是这样的:

public static class SortFunction extends ProcessFunction<Event, Event> {
  private ValueState<PriorityQueue<Event>> queueState = null;

  @Override
  public void open(Configuration config) {
    ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
        // state name
        "sorted-events",
        // type information of state
        TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
        }));
    queueState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
    TimerService timerService = context.timerService();

    if (context.timestamp() > timerService.currentWatermark()) {
      PriorityQueue<Event> queue = queueState.value();
      if (queue == null) {
        queue = new PriorityQueue<>(10);
      }
      queue.add(event);
      queueState.update(queue);
      timerService.registerEventTimeTimer(event.timestamp);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
    PriorityQueue<Event> queue = queueState.value();
    Long watermark = context.timerService().currentWatermark();
    Event head = queue.peek();
    while (head != null && head.timestamp <= watermark) {
      out.collect(head);
      queue.remove(head);
      head = queue.peek();
    }
  }
}

更新:有关通常更好的方法的描述,请参阅如何使用Flink对乱序事件时间流进行排序。

 类似资料:
  • flink流有多个数据流,然后我将这些数据流与org合并。阿帕奇。Flink。流式处理。api。数据流。DataStream#联合方法。然后,我得到了问题,数据流是无序的,我不能设置窗口来排序数据流中的数据。 在Apache Flink中对流的联合进行排序以识别用户会话 我得到了答案,但是com。利亚姆。学Flink。实例协会UnionStreamDemo。从未调用SortFunction#onT

  • 我对从流中排序列表感兴趣。这是我正在使用的代码: 我是不是漏了什么?之后不会对列表进行排序。 它应该根据最低值的项目对列表进行排序。 以及打印方法:

  • 我想像下面这样对流进行反向排序,但是编译时错误为。有人能纠正这个吗

  • 问题内容: 在我正在使用的代码下面,可以正常工作并输出名称,但不能使用sort方法。我期望“ Collections.sort(nameFromText);” 按名字的字母顺序对ArrayList进行排序。 我究竟做错了什么? 问题答案: 方法期望要排序的列表元素具有可比性。元素类型应该实现接口,或者您应该使用带有通用实例的重载方法。 在下面的代码中,您不满足上述两个条件。您的类既没有实现,也没有

  • 我试图写一个函数来排序一个对象集合。由于对象都是相同的类型(相同的用户定义类),因此它们的属性集是相同的。是否有可能(通过代码)发现对象的属性,以便将集合放在一个二维数组中,每行代表一个对象,每列代表它的一个属性? 另一种解决方案是将集合中的每个对象复制到对象数组中,并根据它们的一个属性对它们进行排序,该属性的名称作为字符串传递给函数。但是我不知道如何使用作为字符串传递的属性名来指向对象的属性。

  • 问题内容: 我有这种格式的数组: 如何按该字段的降序对该格式的数组排序?是否有内置功能? 问题答案: 使用并提供您自己的功能进行订购,例如