当前位置: 首页 > 知识库问答 >
问题:

生成“心跳”类型的事件以向前推送事件时间

许马鲁
2023-03-14

我正在构建一个Flink流媒体应用程序,并且更喜欢使用事件时间,因为它将确保在历史数据出现故障或重播时,所有设置的计时器都将决定性地触发。事件时间的问题在于,只有当事件发生时,时间才会向前移动。我们的数据源(物理传感器)有时产生的数据很少,因此有时单个数据点可能会打开一个5分钟的聚合窗口,但下一个数据点会在20分钟后关闭,因此窗口会很晚关闭并发出输出记录。

我们提出的解决方案是使用AWS lambda函数,该函数计划每X分钟运行一次,将一个虚拟事件输出到Flink读取的Kinesis流中,从而强制生成一个水印,使时间提前。

我担心的是,只有当水印是真正全局的时,这才有效,这意味着单个心跳消息可以导致创建一个水印,该水印可以提前Flink应用程序中每个操作员/任务的事件时间,该应用程序使用源自此流的数据。这些文档让我相信,Flink将从一个源进行parellizes读取,其中每个并行读取操作符生成自己的水印,然后一个下游操作符(比如一个窗口)获取它所看到的各种水印的最小值。如果是这种情况,这对我来说似乎有问题,因为每个并行水印生成器都需要一个虚拟心跳事件,但我无法控制哪些节点从流中读取心跳消息。

所以,我的问题是,下游运营商如何准确地使用水印来提前事件时间,是否可以向kinesis流中添加一条伪消息来提前整个Flink应用程序的事件时间?

如果没有,我如何强制活动时间向前移动?

共有1个答案

柯浩壤
2023-03-14

你是对的;这里有一个问题。boundedAutofordernessTimestampExtractor实现的标准周期性水印生成器依赖于看到时间戳较大的新事件来推进水印。

有几种方法可以解决这个问题:

>

  • 在并行度为1的任务中运行源和水印赋值器(如果需要,然后增加管道其余部分的并行度)。这样一条心跳信号就足够了。

    广播心跳信息。这样,每个并行实例都将接收它们,它们都可以推进水印。

    代替心跳消息,实现水印生成器,该生成器使用流转时长计时器人为地推进水印,尽管缺乏传入事件。https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java一个例子。

    请注意,第三种方法不太理想,因为它与流转时长耦合,消除了纯事件时间方法的一些核心优势。

    如果使用心跳信号源,则需要为另一个(有时为空闲)返回MAX_水印的源实现水印生成器。否则,来自该流的水印将保留整个水印。

    此外,AWS Lambda感觉有点过分。您可以实现一个简单的自定义Flink源来创建心跳事件。

  •  类似资料:
    • 我已经阅读了Github Actions留档,无法弄清楚为什么当我发布新版本时,GitHub Action工作流会被几乎同时的和事件触发。换句话说,我的工作流yaml文件是这样开始的,指定它应该为推送和发布事件运行: 当我发布一个新版本(标签)时,我希望它执行一次。然而,我看到两个事件几乎同时触发(间隔约5-10秒): 现在,事件确实触发了我的存储库的,但唯一正在更新的文件是我的文件。事件的定义没

    • 事件 事件:用户与浏览器之间特定的交互瞬间。 事件类型 web浏览器发生的事件有很多类型,不同的类型有着不同的信息。"DOM3级事件"包含有:UI事件、焦点事件、滚轮事件、文本事件、鼠标事件、键盘事件、合成事件、变动事件、变动名称事件等。 UI事件 UI事件指的是那些 不一定与用户操作有关 的事件。DOM规范中留下向后兼容。UI事件包含: load:当页面加载后在window上面触发,当所有框架加

    • 目前BindingX支持四种能力。每一种能力都对应一个EventType,在进行bind的时候,选择不同的EventType即可。 EventType 说明 pan 监听手势的pan事件 timing 监听时间变化,用来实现动画 scroll 监听滚动容器的onScroll事件 orientation 监听设备方向变化,与web DeviceOrientation一致 每一种EventType都提

    • 在1.7.10中是否有一个事件用于当一个块生成时,所以我可以在它上面放置一些东西。还是我一定要在这一代人中做到这一点? 我已经上网了,但我找不到 事件之类的。

    • http://Socket.io允许你触发或响应自定义的事件,除了connect,message,disconnect这些事件的名字不能使用之外,你可以触发任何自定义的事件名称。 服务器端 // 注意,io(<端口号>) 将为你创建一个http服务。 var io = require('socket.io')(80); io.on('connection', function (socket)

    • 主要内容:值,child_added,child_changed,child_removedFirebase提供了几种不同的事件类型来读取数据。 下图显示了想要读取的数据 - 下面介绍一些最常用的方法。 值 第一个事件类型是值。 我们在上一章已经演示了如何使用值。 每次数据更改时都会触发此事件类型,并且将检索包括子项在内的所有数据。 child_added 这个事件类型将被每个运动员触发一次,每次新运动员被添加到数据中。 读取列表数据非常有用,因为我们可以从列表中获得添加的运动员和前一个