当前位置: 首页 > 知识库问答 >
问题:

用于非连续数据的Flink和Kinesis流应用程序

方鸿羲
2023-03-14

我们已经构建了一个Flink应用程序来处理来自动觉流的数据。应用程序的执行流程包含基于注册类型过滤数据、基于事件时间戳分配水印的基本操作,以及应用于5分钟数据窗口的映射、处理和聚合功能,如下所示:

    final SingleOutputStreamOperator<Object> inputStream = env.addSource(consumer)
            .setParallelism(..)
            .filter(..)
            .assignTimestampsAndWatermarks(..);

    // Processing flow
    inputStream
            .map(..)
            .keyBy(..)
            .window(..)
            .sideOutputLateData(outputTag)
            .aggregate(aggregateFunction, processWindowFunction);

    // store processed data to external storage
    AsyncDataStream.unorderedWait(...);

我的水印分配程序的参考代码:

    @Override
public void onEvent(@NonNull final MetricSegment metricSegment,
                    final long eventTimestamp,
                    @NonNull final WatermarkOutput watermarkOutput) {
    if (eventTimestamp > eventMaxTimestamp) {
        currentMaxTimestamp = Instant.now().toEpochMilli();
    }
    eventMaxTimestamp = Math.max(eventMaxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(@NonNull final WatermarkOutput watermarkOutput) {
    final Instant maxEventTimestamp = Instant.ofEpochMilli(eventMaxTimestamp);
    final Duration timeElaspsed = Duration.between(Instant.ofEpochMilli(lastCurrentTimestamp), Instant.now());
    if (timeElaspsed.getSeconds() >= emitWatermarkIntervalSec) {
        final long watermarkTimestamp = maxEventTimestamp.plus(1, ChronoUnit.MINUTES).toEpochMilli();
        watermarkOutput.emitWatermark(new Watermark(watermarkTimestamp));
    }
}

现在,这个应用程序的性能很好(在几秒钟内的延迟方面),早就有了。然而,最近在上游系统post中发生了变化,其中Kinesis流中的数据以突发方式发布到流中(每天仅2-3小时)。在这一变化之后,我们看到我们的应用程序的延迟出现了巨大的峰值(使用flinkgauge方法测量,方法是在first filter方法中记录开始时间,然后在Async方法中通过计算从开始时间映射到该点的时间戳中的差异来发出度量)。想知道对于突发流量/非连续数据流,将Flink应用程序与Kinesis流一起使用是否存在任何问题?


共有1个答案

刁璞
2023-03-14

由于输入流现在长时间处于空闲状态,这可能会造成水印被挂起的情况。如果是这种情况,那么我预计延迟会有很大的变化,因为它(可能)只是每个突发的最终窗口,其结果会延迟到下一个突发的到来。

 类似资料:
  • 我正在测试Apache Flink(使用v1.8.2)从Kinesis Data Stream读取消息的速度。Kinesis Data Streams仅包含一个分片,它包含40,000条消息。每个消息大小小于5 KB。 尝试使用TRIM\u HORIZON从最旧的消息中读取流,我希望该应用程序能够快速读取所有消息,因为通过GetRecords,每个碎片可以支持高达每秒2 MB的最大总数据读取速率。

  • 我使用Flink与运动源和事件时间键控窗口。应用程序将监听实时数据流,窗口化(事件时间窗口)并处理每个键控流。我有另一个用例,我也需要能够支持某些关键流的旧数据回填(这些将是具有事件时间的新关键流 鉴于我正在使用水印,这是一个问题,因为Flink不支持每键水印。因此,回填的任何键控流将最终被忽略,因为此流的事件时间将是 我经历了其他类似的问题,但没能得到一个可能的方法。以下是我正在考虑的可能方法,

  • 在我当前的项目中,我的目标是从帧流中检测不同的对象。视频帧是用与覆盆子PI连接的摄像机拍摄的。 体系结构设计如下: > 代码正在raspberry PI上运行。此代码将图像流发送到AWS中的Kinesis数据流(称为)。 Lambda函数() 以下是Kinesis数据流日志(日期为2019年8月17日-IST下午1:54)。最后一次,2019年8月16日通过覆盆子PI摄取的数据-下午6:45)

  • Serverless 适合用于事件驱动型应用,以及定时任务。今天,让我们来看看一个事件驱动的例子。 在之前的那篇《Serverless 应用开发指南:CRON 定时执行 Lambda 任务》中,我们介绍了如何调度的示例。 最初我想的是通过 Lambda + DynamoDB 来自定义数据格式,后来发现使用 Kinesis Streams 是一种更简单的方案。 Amazon Kinesis Stre

  • 我有一个Flink应用程序在Amazon的Kinesis数据分析服务(托管Flink集群)中运行。在应用程序中,我从Kinesis流keyBy userId读取用户数据,然后聚合一些用户信息。问了这个问题之后,我了解到Flink会在集群中跨物理主机拆分一个流的读取。然后Flink将传入事件转发给主机,该主机将聚合器任务分配给对应于给定事件的密钥空间。 考虑到这一点,我试图决定使用什么作为我的Fli

  • 问题:Flink应用程序未接收和处理Kinesis连接器在关闭时生成的事件(由于重新启动) 我们有以下Flink环境设置 动力系统有以下初始配置 有趣的是,当我更改运动配置以回复事件时,即。 Flink正在从Kinesis接收所有缓冲记录(这包括在事件Flink应用程序关闭之前、期间和之后生成的事件)并对其进行处理。因此,此行为违反了Flink应用程序的“恰好一次”属性。 有人能指出我遗漏的一些明