当前位置: 首页 > 知识库问答 >
问题:

了解Flink时间窗口连接中的延迟和水印

沃宇
2023-03-14

我正在尝试加入Flink中的两种类型(比如A和B)。我想确认我的理解是否正确。事件的某些属性-

  1. 事件A立即流入flink,延迟几分钟(5-10分钟)
  2. 事件B以15-30分钟的轻微延迟流动
  3. 事件a和事件B之间存在1:1连接

我已将事件A的数据流配置为10分钟的BoundedAutofordernessTimestampExtractor,将事件B的数据流配置为30分钟。稍后,我使用表API进行时间窗口连接。

我对以下内容的理解正确吗-

  1. 只要事件在延迟时间窗口内(事件A为10分钟,事件B为30分钟),则在收到事件后立即对其进行处理和加入。由于Flink的任何配置,端到端延迟没有最小限制

关于下面的代码还有其他建议吗?

queryConfig.withIdleStateRetentionTime(
    org.apache.flink.api.common.time.Time.seconds(1),
    org.apache.flink.api.common.time.Time.minutes(30))

val stream: DataStream[Any] = textStream.flatMap(json => convert(json))

val aStream: DataStream[ClassA] =
    stream
        .filter(obj => obj.isInstanceOf[ClassA])
        .rebalance
        .map(obj => obj.asInstanceOf[ClassA])
        .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[ClassA](
                Time.minutes(10)){
                override def extractTimestamp(element: ClassA): Long =
                    element.serviceTimestamp.toInstant.toEpochMilli
            })

val bStream: DataStream[ClassB] =
    stream
            .filter(obj => obj.isInstanceOf[ClassB])
            .rebalance
            .map(obj => obj.asInstanceOf[ClassB])
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor[ClassB](
                    Time.minutes(30)){
                    override def extractTimestamp(element: ClassB): Long =
                        element.timestamp.toInstant.toEpochMilli
                })

val aTable: Table  = tableEnv.fromDataStream[ClassA](aStream,
    // The .rowtime is for setting event time attributes
    'aTimestamp.rowtime as 'aTimestamp, 'aUniqueId, 'aItem)

val bTable: Table  = tableEnv.fromDataStream[ClassB](bStream,
    // The .rowtime is for setting event time attributes
    // https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
    'bTimestamp.rowtime as 'bTimestamp, 'uniqueId, 'bItem)

val result: Table = aTable
        .join(aTable)
        .where('aUniqueId === 'uniqueId
                // Give ClassB events 30 minutes lateness.
                // Use a time window join as optimization - https://stackoverflow.com/a/51620821
                // & https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#time-windowed-joins
                // Both time clauses are need to qualify as time window join
                && 'bTimestamp >= 'aTimestamp
                && 'bTimestamp <= 'aTimestamp + 30.minutes)
        // DO NOT change order without changing order in later parsing code
        .select('uniqueId, 'aItem, 'bItem, 'bTimestamp, 'aTimestamp.cast(createTypeInformation[Timestamp]))

val outputStream: DataStream[ClassC]  = tableEnv
                .toAppendStream[(String, String, String, Timestamp, Timestamp)](result)
                // TODO find better way to map to a POJO
                .map(row => ClassCUtils.toClassC(row))

共有1个答案

拓拔高畅
2023-03-14

只要事件在延迟时间窗口内(事件A为10分钟,事件B为30分钟),则在收到事件后立即对其进行处理和加入。由于Flink的任何配置,端到端延迟没有最小限制。

没错。事件将在接收时进行映射和过滤,并放入缓冲区以满足加入窗口的要求。

该表将保留事件最多30分钟,直到水印从两条流到达。之后根据水印清除事件

没错。IntervalJoinOP将从连接的右侧和左侧接收事件,检查它们是否在时间范围内,如果在时间范围内,则向下游发出它们:

private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {

        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

下面代码中的查询配置是多余的,实际上并不需要

没错<当您使用无界运算符时,例如SQL中不带windows属性的GROUP BY子句,code>WithildRestateTentionTime与之相关。

 类似资料:
  • 我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f

  • 我很难理解水印和允许迟到的概念。 以下是[邮件存档]的摘录|https://www.mail-archive.com/user@Flink。阿帕奇。组织/msg08758。html]这谈到了水印,但我还有几个问题。以下是引用的示例: 假设您有一个,具有2分钟的绑定和10分钟的翻转窗口,从12:00开始到12:10结束: 如果您具有以下流序列: 不允许迟到 当窗口操作符接收到<代码> 允许迟到3分钟

  • 我有以下用例,如果有明显的解决方案,很抱歉,但我对Flink非常陌生: 谢谢

  • 使用翻滚窗口的apache flink应用程序遇到问题。窗口大小是10秒,我希望每隔10秒有一个resultSet数据流。然而,当最新窗口的结果集总是延迟时,除非我将更多数据推送到源流。 例如,如果我在“01:33:40.0”和“01:34:00.0”之间将多条记录推送到源流,然后停止查看日志,则不会发生任何事情。 我在“01:37:XX”上再次推送一些数据,然后将在“01:33:40.0”和“0

  • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有

  • 我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?