当前位置: 首页 > 知识库问答 >
问题:

ApacheFlink-如何将AssignerWithPeriodicWatermark和AssignerWithPeriodicWatermark结合起来?

司空镜
2023-03-14

Usecase:使用EventTime并从Kafka的记录中提取时间戳。

myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
        .keyBy("platform")
        .window(TumblingEventTimeWindows 5 mins))
        .aggregate(AggFunc(), WindowFunc())
        .countWindowAll(size)
        .apply(someFunc)
        .addSink(someSink);

我想要的是:Flink提取时间戳并在初始间隔(例如20秒)内为每条记录发出水印,然后它可以周期性地发出水印(例如每10秒)。

原因:如果我使用PeriodicWatermark,开始时Flink只会在一段时间间隔后发出水印,并且我的第一个窗口(5分钟)中的计数是错误的-比后续窗口中的计数大得多。我有一个解决办法,将自动水印间隔设置为100ms,但这是多余的。

目前,我必须使用AssignerWithPeriodicWatermark或AssignerWithPeriodicWatermark。我如何实施这种组合策略的方法?谢谢

共有1个答案

李嘉胜
2023-03-14

在使用水印生成器做一些不寻常的事情之前,我会仔细检查您是否正确诊断了这种情况。总的来说,事件时间窗口应该是确定性的,如果用相同的输入呈现,总是会产生相同的结果。如果第一个窗口的结果取决于生成水印的频率,这表明您可能有延迟事件,这些事件在水印更频繁到达时被删除,并且在水印不太频繁时能够被包括在内。也许你的水印没有正确地解释你的事件正在经历的混乱的实际程度?或者你的水印是基于System.currentTimeMillis(),而不是事件时间戳?

而且,第一个时间窗口与其他时间窗口不同是正常的,因为时间窗口与历元对齐,而不是与第一个事件对齐。当然,这样做的效果是,第一个窗口覆盖的时间比所有其他窗口短,因此您应该期望它包含的事件更少,而不是更多。

将setAutoWatermark Interval设置为100ms是一件非常正常的事情。但是如果你真的想避免这种情况,你可以考虑一个AsSignerAnd PunctuatedWatermark,它最初为每个事件返回水印,然后在适当的间隔后,返回水印的频率会降低。

在带标点的水印赋值器中,对每个事件都调用extractTimestamp和checkAndGetNextWatermark方法。您可以在赋值器中使用一些瞬态(非flink)状态来跟踪第一个事件的时间,或者对事件进行计数,并在checkAndGetNextWatermark中使用该信息最终退出并停止为每个事件生成水印(有时通过从checkAndGetNextWatermark返回null,而不是水印)。每当重新启动应用程序时,它将始终恢复为为每个事件生成水印。

这不会产生一个具有周期和标点赋值器所有特征的赋值器,它只是一个自适应标点赋值器。

 类似资料:
  • 我在UCF-101数据集上训练了SVM、CNN和ANN,SVM和ANN分别使用Hue和LBP特征的CSV文件,而CNN使用LBP图像进行分类训练。现在我想结合{SVM和CNN}和{ANN和CNN}。可以这样做吗?如果可以,怎么做。 我已经为Dataset中的每个视频提取了第1关键帧,然后计算了它的LBP直方图。将其用作图像的特征,将其连同标签一起写入csv(我在101个可用的类中只选择了5个类的数

  • 我想在React项目的后端合并一个使用语音识别的Python文件。 这里我有一段使用语音识别的Python代码: 我尝试在react组件中导入文件,如下所示: 在这里,我尝试将我的按钮链接到Python文件中的函数: 我收到了它未能编译的消息以及以下内容: 我能做些什么来使这个工作?

  • 假设存在包含方法的接口: 实现combinedCall方法的最佳方法是什么: 从makeHttpCall获取数据 使用store InDatabase存储它 返回在store InDatabase完成时完成的完成? 似乎在RxJava 1.0中可以执行Completable.merge(可观察),但合并似乎不再接受可观察。

  • 我正在尝试使用gulp-watch和gulp-inject来构建我的节点Web应用程序。然而,一旦gulp-watch参与进来,涉及gulp-inject的构建步骤似乎就不起作用了。原因似乎是流永远不会结束,gulp-inject不知道何时开始。 我的gulpfile如下所示:

  • 我有一个带有一些可选字段和该类型变量的TypeScript接口: 我想将放入同名变量中。

  • 我写了一个查找查询,它的工作,查找查询返回记录的名称和级别存在 现在想把它和下面的代码结合起来,下面的代码也可以工作,但是需要和上面的代码合并来提取正确的数据 find查询返回name和level存在的记录,但我需要使用名为honors的新列来增强结果,根据级别是否为gte(大于或等于8),显示True或False 所以我基本上是在尝试结合上面的查找过滤器和$cond函数(我在这里找到并修改了示例