当前位置: 首页 > 知识库问答 >
问题:

Apache对水印空闲的模糊理解及其与有限持续时间和窗口持续时间的关系

蒋寒
2023-03-14

我有一个配置了Kafka连接器的Flink管道。

我已使用以下方法将水印生成频率设置为2秒:

env.getConfig().setAutoWatermarkInterval(2000);

现在,对于流窗口,我的翻滚窗口是60秒,我们在其中进行一些聚合,并且基于一个数据字段的时间戳进行基于事件时间的处理。

我没有为我的水印策略或我的流配置允许延迟。

final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
        topicConfig.name(),
        AvroDeserializationSchema.forSpecific(MyPojo.class),
        topicConfig.forConsumer()
);
myEvents.setStartFromLatest();



myEvents.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .<MyPojo>forBoundedOutOfOrderness(
                Duration.ofSeconds(30))
        .withIdleness(Duration.ofSeconds(120))
        .withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));

Q.1根据我所读到的,我的时间0-60的窗口将在90秒后计算,30-90在120秒后计算,以此类推。然而,因为我们在做翻转窗口,即没有重叠,我猜没有30-90窗口,0-60后的下一个窗口是60-120,在150秒时触发,我说得对吗?

Q.2在不允许延迟的情况下,所有延迟数据都将被丢弃,例如,时间戳为45且在90秒后到达的事件被视为无序,并将超出第一个窗口,即0-60。对于窗口60-120,事件时间戳不匹配,因此它将被丢弃,不包括在以150秒标记触发的窗口中,对吗?

问题3。对于源空闲持续时间,我选择120,表示如果主题的任何Kakfa分区没有数据,则在2分钟后将其标记为空闲,然后为其他活动分区发送水印。我的问题是选择这个数字,即2分钟,以及它是否与窗口持续时间(60秒)或无序(30秒)有关。如果是这样的话,我在这里应该记住什么,以便进行适当的选择,这样我就不会因为空闲分区导致的非前进水印而导致数据延迟?

或者是120是一个太长的等待,我可能会错过数据,因此我应该将其设置为远小于OutOfOrness持续时间,以确保0数据丢失?

编辑:添加了更多的代码

共有1个答案

冯通
2023-03-14

问题1:是的,没错。

Q2:是的,这也是正确的。

Q3:这里的细节取决于您是否让Kafka源应用Watermark Strategy,在这种情况下,它将执行每个分区的水印,或者Watermark Strategy是否作为一个单独的运算符部署在源运算符之后(通常紧随其后)。

在第一种情况下(由FlinkKafkaConsumer完成每个分区的水印),您将执行以下操作:

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>(...);

kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy ...);

DataStream<MyType> stream = env.addSource(kafkaSource);

而在源代码之后单独进行水印处理,如下所示:

DataStream<MyType> events = env.addSource(...);

DataStream<MyType> timestampedEvents = events
  .assignTimestampsAndWatermarks(
      WatermarkStrategy
        .<MyType>forBoundedOutOfOrderness(Duration ...)
        .withTimestampAssigner((event, timestamp) -> event.timestamp));

当水印在每个分区的基础上完成时,单个空闲分区将为处理该分区的使用者/源实例保留水印——直到空闲超时开始(在您的示例中为120秒)。相反,如果水印是在链接到源的单独操作符中完成的,那么只有分配给该源实例的所有分区(具有空闲分区的分区)都是空闲的,水印才会被保留(同样,保留120秒)。

但不管这些细节如何,我们都希望不会有数据丢失。将有一段时间不会触发窗口(因为水印没有前进),但事件将继续处理并分配给相应的窗口。一旦水印恢复,这些窗口将关闭并提供结果。

如果分区处于空闲状态,则会发生数据丢失,因为上游的某些故障导致中断,最终会产生一系列延迟事件。空闲超时过期后,水印将前进,如果源因上游中断而空闲(而不是因为没有事件),则最终到达的事件将延迟(除非您的无序延迟足够大以容纳它们)。如果选择忽略延迟事件,则这些事件将丢失。

 类似资料:
  • 在ISO 8601中,持续时间的格式为PT5M(5分钟)或PT2H5M(2小时5分钟)。我有一个JSON文件,其中包含这种格式的值。我想知道spark是否可以提取分钟的持续时间。我尝试将其读取为“DateType”,并使用“minutes”函数获取分钟数,结果返回空值。 示例json 目前,我正在将其作为字符串读取并使用“regex_extract”函数。我想知道一种更有效的方法。 https:/

  • 我有ISO8601格式的持续时间值,我将其转换为时间值为整数秒,如下所示: ISO8601格式的持续时间值=“P1Y”。 我将该值存储在“时间-单位-秒”中。因此,当我检索的值将是在int,我想转换回ISO8601持续时间格式,所以我应该得到“P1Y”转换后回来。 有没有快速的方法可以做到这一点?或者我必须把时间的int值转换成float,然后通过某种方法把它转换成ISO8601持续时间。

  • 问题内容: 如何获取格式为2个字符串的持续时间? 我正在尝试使用Calendar类并检查。我与此有关的问题是它不一致。知道我在做什么错吗?每次我运行该程序时,如果没有,输出40-70行到控制台。 问题答案: 那是一个已记录的错误。 尝试在设置日历之前清除日历:

  • 许多记者将显示测试持续时间,以及标记缓慢的测试,如“spec”记者所示: 要调整被认为“慢”的东西,您可以使用以下slow()方法: describe('something slow', function() { this.slow(10000); it('should take long enough for me to go make a sandwich', function() { /

  • 在ISO 8601中,持续时间的格式为<code>P[n]Y[n]M[n]DT[n]H[n]M[n]S。 例子: 20秒: 一年两个月三天四小时五分钟六秒: 问题: 给定一个包含iso 8601格式的持续时间的字符串。我想获得这段时间的总秒数。标准C 11中推荐的实现方式是什么? 备注: 例如,boost DateTime中有ptime from _ iso _ string(STD::strin

  • 在我的应用程序中,用户需要能够输入持续时间,包括分钟和秒。 我正在考虑使用Windows中的控件来更改时间,虽然我只需要几分钟和几秒钟,而且箭头也很好,但不是必需的。 JavaFX 2中是否已经有此控件 如果没有,我如何创建这样的控件 上述Windows控件的屏幕截图: 谢谢你的提示!