当前位置: 首页 > 知识库问答 >
问题:

Flink检查点的大小增长超过20GB,检查点的时间超过1分钟

汪翰墨
2023-03-14

首先也是最重要的:

    null

现在,这项工作一直很好,直到上周,我们有一个激增(10倍以上)的流量。从那时候起,Flink变成了香蕉。检查点大小开始从500MB缓慢增长到20GB,检查点时间大约需要1分钟,并且随着时间的推移而增长。应用程序开始失败,并且永远无法完全恢复,事件迭代器的年龄增长也永远不会下降,因此没有新的事件被消耗。

因为我是一个新的闪现,我不确定我做滑动计数的方式是不是完全没有优化或明显的错误。

这是代码关键部分的一个小片段:

SourceFunction<Event> source =
      new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);
public class Event implements Serializable {
  public String entityId;
  public String entityType;
  public String entityName;
  public long eventTimestamp = System.currentTimeMillis();
}

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> eventsStream = kinesis
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(Event event) {
          return event.eventTimestamp;
        }
      })

DataStream<Event> fooStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "foo".equalsIgnoreCase(event.entityType);
        }
      })

 DataStream<Event> barStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "bar".equalsIgnoreCase(event.entityType);
        }
      })


StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Foo", fooTable);
    Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Bar", barTable);

Table slidingFooCountTable = fooTable
      .window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");

Table slidingBarCountTable = barTable
      .window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");

    Table tumblingFooCountTable = fooTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");
   
    Table tumblingBarCountTable = barTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");

    Table aggregatedTable = slidingFooCountTable
      .leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
      .select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");

    DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
    result.addSink(sink); // write to an output stream to be picked up by a lambda function

我在Stackoverflow@Davidanderson的某个地方读到,建议使用映射状态创建我们自己的滑动窗口,并按时间戳对事件进行切片。然而,我并不完全确定这意味着什么,我也没有找到任何代码示例来展示它。

共有1个答案

谭玄天
2023-03-14

你在里面创造了很多窗口。如果您正在创建一个大小为24h和滑动5分钟的滑动窗口,这意味着其中将有很多打开的窗口,所以您可能会期望您在给定的一天中接收到的所有数据将在至少一个窗口中检查点,如果您仔细考虑的话。因此,可以肯定的是,检查点的大小和时间将随着数据本身的增长而增长。

为了能够得到答案,如果代码可以重写,您需要提供更多的细节,说明您在这里到底想要实现什么。

 类似资料:
  • 源在内存中创建任意数量的事件,每秒吞吐量为1个事件。每个事件都有用于分区流的唯一id(使用keyBy运算符),并通过映射函数向托管状态(使用ValueState)添加约100KB。然后将事件简单地传递给不执行任何操作的接收器。 使用上面描述的设置,我们发送了1200个事件,检查点间隔和最小暂停设置为5秒。当事件以恒定的速度和相等的状态量出现时,我们期望检查点的大小或多或少是恒定的。然而,我们观察到

  • 我在我的Flink应用程序(版本1.11.1)中使用事件时语义,该应用程序运行在AWS-kinesis analytics中。此应用程序的源为kinesis stream,汇为Postgres。notifyCheckpointComplete()上触发DB接收器时,检查点间隔为10秒。我正在使用多个协处理函数和ValueState连接不同的流,然后再将其放入Postgres。 观察到,检查点数据大

  • 我们正在接收来自多个独立数据源的事件,因此,到达我们Flink拓扑(通过Kafka)的数据将是无序的。 我们正在Flink拓扑中创建1分钟的事件时间窗口,并在源操作符处生成事件时间水印(当前事件时间-某些阈值(30秒))。 如果一些事件在设置的阈值之后到达,那么这些事件将被忽略(在我们的例子中这是可以的,因为属于该分钟的大多数事件都已经到达并在相应的窗口中得到处理)。 现在的问题是,如果程序崩溃(

  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 我有以下CEP PatternStream,其中数据流是基于实体ID分区的,因为只有实体具有相同的实体ID时,我才对模式匹配感兴趣: 但随后我注意到检查点状态大小随着实体ID数量的增加而增加。如果我对检查点的理解是正确的,这是意料之中的,因为运算符状态的数量会增加。但我想弄清楚是否有其他方法可以最小化检查点状态大小。 > 有没有不同的方法来实现这种模式匹配,而不根据实体ID对数据流进行分区?

  • 我知道stackoverflow上也有类似的问题,但在调查了其中几个之后,我知道 > 他们正在使用不同的存储格式 但这些并不是令人困惑的地方,我不知道什么时候该用一个,什么时候该用另一个。 考虑以下两种情况: 如果由于某种原因(例如错误修复或意外崩溃)需要关闭或重新启动整个应用程序,那么我必须使用保存点来恢复整个应用程序