当前位置: 首页 > 知识库问答 >
问题:

Flink流窗口触发器

戚阳文
2023-03-14

我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。

这里发生的事情是,它给我的结果,我以前的窗口聚合以及。

假设前30秒我得到结果10。

接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。

因此,我的问题是如何为每个窗口获得新的结果。

共有3个答案

楚丰羽
2023-03-14

我在这个问题上有点晚了,但是我在OP上遇到了同样的问题。我后来发现我自己的代码中有一个bug。仅供参考,我的错误可以作为解决您问题的参考。

// Old code (modified to be an example):
val tenSecondGrouping: DataStream[MyCustomGrouping] = userIdsStream
      .keyBy(_.somePartitionedKey)
      .window(TumblingProcessingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
      .trigger(ProcessingTimeTrigger.create())
      .aggregate(new MyCustomAggregateFunc(new MyCustomGrouping()))

错误发生在新的MyCustomGrouping上:我无意中创建了一个singleton MyCustomGrouping对象,并在MyCustomAggregateFunc中重用它。随着更多滚动窗口的创建,后期的聚合结果变得疯狂!修复方法是在每次触发MyCustomAggregateFunc时创建新的MyCustomGrouping。因此:

// New code, problem solved
          ...
          .aggregate(new MyCustomAggregateFunc(() => new MyCustomGrouping())) 
// passing in a func to create new object per trigger
姚正真
2023-03-14

您描述的功能可以在翻滚窗口中找到:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#tumbling-windows

更详细一点和/或代码会有所帮助:)

公孙栋
2023-03-14

您需要使用清除触发器。你想要的是FIRE_AND_PURGE(发射和删除窗口内容),默认的flink触发器所做的是FIRE(发射和保留窗口内容)。

input
    .keyBy(...)
    .timeWindow(Time.seconds(30))

    // The important part: Replace the default non-purging ProcessingTimeTrigger
    .trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger))

    .reduce(...)

要获得更深入的解释,请查看触发器和FIRE与FIRE_和_PURGE的对比。

触发器确定窗口(由窗口赋值器形成)何时准备好由窗口函数处理。每个WindowAssigner都带有一个默认触发器。如果默认触发器不符合您的需要,可以使用触发器(…)指定自定义触发器。

当触发器触发时,它可以触发或FIRE_AND_PURGE。当FIRE保留窗口内容时,FIRE_AND_PURGE删除其内容。默认情况下,预实现的触发器只是FIRE而不清除窗口状态。

 类似资料:
  • 我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从

  • 我正在尝试在我的Flink作业中使用事件时间,并使用来提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得中的根本没有调用。我可以看到数据进入函数。 我已经设置了getEnv()。getConfig()。设置自动水印间隔(1000L) 我尝试过 还有会话窗口 所有的水印都显示没有水印,我怎么能让Flink忽略这个没有水印的东西呢?

  • 我想记录从火花结构化流的传入流读取到数据库的记录数。我正在使用ForeachBatch来转换传入的流批处理和写入所需的位置。我想日志0记录读取,如果在一个特定的小时没有记录。但是当没有流时,Foreach批处理不会执行。有人能帮我吗?我的代码如下:

  • 我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?

  • 问题陈述:来自kafka源的流式事件。这些事件有效载荷为字符串格式。将它们解析为文档,并根据事件时间每隔5秒将其批量插入DB。 函数正在执行。但程序控制不会进入。因此不会发生批量插入。我尝试了键控和非键控窗口。它们都不工作。没有抛出错误。 flink版本:1.15.0 下面是我的主要方法的代码。我应该如何解决这个问题?

  • 当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?