当前位置: 首页 > 知识库问答 >
问题:

Apache flink-早期触发窗口实现问题-收到重复元素

亢建木
2023-03-14

我很难理解flink窗口原则,如果你能给我指明正确的方向,我将非常高兴。

我的目的是计算一个时间间隔内重复发生的事件的数量,如果重复发生的事件的数量大于阈值,则生成警报事件。

我正在使用事件时语义和自定义触发器的问题/疑问。

您可以在GIST中找到实际的实现:https://gist.github.com/simpleusr/7C56D4384F6FC9F0A61860A680BB5F36

我使用键控状态来跟踪窗口中的元素计数EncounteredelementsCountState

问题是,我不得不在不明白原因的情况下插入下面的检查代码。因为以前收集的元素再次提供给onelement方法:

if (ctx.getCurrentWatermark() < 0) {
            logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
            return TriggerResult.CONTINUE;
        }

我想不出原因。我看到的是,当发生这种情况时,水印值为(ctx.getCurrentWatermark())long.min_value(这导致了上面的检查)。怎么会这样呢?

这种检查似乎避免了重复的早期事件生成,但我不知道为什么会发生这种情况,这种变通方法是否合适。

你能告诉我为什么相同的元素在窗口中被处理两次吗?

另一个问题是关于键控状态的用法。此实现在释放窗口后是否泄漏任何状态?我正在尝试清除触发器的clear方法中所有使用的状态,但这是否足够?

问候。

共有1个答案

百里杰
2023-03-14

每个任务都将currentWatermark初始化为long.min_value,在该任务的所有输入流中都有较大的水印到达之前,该值保持为currentWatermark的本地值。希望知道这将帮助你更好地理解发生了什么。

就其价值而言,通常使用ProcessFunction实现这种逻辑比使用Window API更简单。

 类似资料:
  • null 然而,当我运行这个管道时,我甚至在通过窗口结束之前就得到了早期结果 这可能是什么原因?

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。

  • 我有这个脚本: HTML中的用法: 由于某些原因,它的工作,如果页面重新加载一半,但不,它没有开火,在它工作之前,所以我不知道发生了什么。我在wordpress网站上使用这个。

  • 我的申请有什么问题?

  • 我正在训练我的第一个转移学习模式(耶!)当验证损失在超过3个时期内没有变化超过0.1时,我很难让模型停止训练。 下面是相关的代码块 下面是一些日志: 问题: 为什么训练没有在第37纪元停止,因为我设置了早期停止回调来监控Valu损失 我可以做更复杂的提前终止回调吗?类似于“如果值”的东西

  • 我面临QuartzScheduler触发器定义的问题。 2019年2月28日星期四16:27:30 IST:开始时间 0/20 0 0 ? * * * Cron表达式 2019年2月28日星期四16:29:30 IST结束时间 触发器schedulerAdderTrigger=触发器生成器。newTrigger()。withIdentity(触发键)。startAt(schedulerdata.g

  • 我正在尝试在我的Flink作业中使用事件时间,并使用来提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得中的根本没有调用。我可以看到数据进入函数。 我已经设置了getEnv()。getConfig()。设置自动水印间隔(1000L) 我尝试过 还有会话窗口 所有的水印都显示没有水印,我怎么能让Flink忽略这个没有水印的东西呢?