当前位置: 首页 > 知识库问答 >
问题:

Flink:某些分区上生成的窗口丢失

解高昂
2023-03-14

我正试图编写一个小的Flink数据流,以更好地了解它的工作原理,我面临着一个奇怪的情况:每次运行它时,我都会得到不一致的输出。有时我期待的一些记录丢失了。请记住,这只是我为学习DataStream API的概念而构建的一个玩具示例。

我有一个大约7600行的CSV格式数据集,如下所示:

Date,Country,City,Specie,count,min,max,median,variance
28/06/2021,GR,Athens,no2,116,0.5,58.9,5.5,2824.39
28/06/2021,GR,Athens,wind-speed,133,0.1,11.2,3,96.69
28/06/2021,GR,Athens,dew,24,14,20,18,35.92
28/06/2021,GR,Athens,temperature,141,24.4,38.4,30.5,123.18
28/06/2021,GR,Athens,pm25,116,34,85,68,702.29

完整的数据集在这里:https://pastebin.com/rknnRnPc

没有特殊字符或引号,因此简单的字符串拆分可以很好地工作。

每个城市的日期范围从2021 06月28日到2021 10月03日。

我正在使用数据流API读取它:

最终数据流

每行都映射到一个简单的 POJO,如下所示:

public class CityMetric {

    private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd/MM/yyyy");

    private final LocalDate localDate;
    private final String country;
    private final String city;
    private final String reading;
    private final int count;
    private final double min;
    private final double max;
    private final double median;
    private final double variance;

    private CityMetric(LocalDate localDate, String country, String city, String reading, int count, double min, double max, double median, double variance) {
        this.localDate = localDate;
        this.country = country;
        this.city = city;
        this.reading = reading;
        this.count = count;
        this.min = min;
        this.max = max;
        this.median = median;
        this.variance = variance;
    }

    public static CityMetric fromArray(String[] arr) {
        LocalDate date = LocalDate.parse(arr[0], dateFormatter);
        int count = Integer.parseInt(arr[4]);
        double min = Double.parseDouble(arr[5]);
        double max = Double.parseDouble(arr[6]);
        double median = Double.parseDouble(arr[7]);
        double variance = Double.parseDouble(arr[8]);

        return new CityMetric(date, arr[1], arr[2], arr[3], count, min, max, median, variance);
    }

    public long getTimestamp() {
        return getLocalDate()
                .atStartOfDay()
                .toInstant(ZoneOffset.UTC)
                .toEpochMilli();
    }

//getters follow

这些记录都是按日期顺序排列的,所以我用它来设置事件时间和水印:

   final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
            WatermarkStrategy.<CityMetric>forMonotonousTimestamps()  //we know they are sorted by time
                    .withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());

我在Tuple4上有一个< code>StreamingFileSink来输出日期范围、城市和平均值:

  final StreamingFileSink<Tuple4<LocalDate, LocalDate, String, Double>> fileSink =
        StreamingFileSink.forRowFormat(
                new Path("airquality"),
                new SimpleStringEncoder<Tuple4<LocalDate, LocalDate, String, Double>>("UTF-8"))
            .build();

最后,我有如下数据流:

 source
        .map(s -> s.split(",")) //split the CSV row into its fields
        .filter(arr -> !arr[0].startsWith("Date")) // if it starts with Date it means it is the top header
        .map(CityMetric::fromArray)  //create the object from the fields
        .assignTimestampsAndWatermarks(cityMetricWatermarkStrategy) // we use the date as the event time
        .filter(cm -> cm.getReading().equals("pm25")) // we want air quality of fine particulate matter pm2.5
        .keyBy(CityMetric::getCity) // partition by city name
        .window(TumblingEventTimeWindows.of(Time.days(7))) //windows of 7 days
        .aggregate(new CityAverageAggregate()) // average the values
        .name("cityair")
        .addSink(fileSink); //output each partition to a file

< code > CityAverageAggregate 只是累加总和与计数,并跟踪它所覆盖范围的最早和最晚日期。

public class CityAverageAggregate
    implements AggregateFunction<
        CityMetric, CityAverageAggregate.AverageAccumulator, Tuple4<LocalDate, LocalDate, String, Double>> {

  @Override
  public AverageAccumulator createAccumulator() {
    return new AverageAccumulator();
  }

  @Override
  public AverageAccumulator add(CityMetric cityMetric, AverageAccumulator averageAccumulator) {
    return averageAccumulator.add(
        cityMetric.getCity(), cityMetric.getLocalDate(), cityMetric.getMedian());
  }

  @Override
  public Tuple4<LocalDate, LocalDate, String, Double> getResult(
      AverageAccumulator averageAccumulator) {
    return Tuple4.of(
        averageAccumulator.getStart(),
        averageAccumulator.getEnd(),
        averageAccumulator.getCity(),
        averageAccumulator.average());
  }

  @Override
  public AverageAccumulator merge(AverageAccumulator acc1, AverageAccumulator acc2) {
    return acc1.merge(acc2);
  }

  public static class AverageAccumulator {
    private final String city;
    private final LocalDate start;
    private final LocalDate end;
    private final long count;
    private final double sum;

    public AverageAccumulator() {
      city = "";
      count = 0;
      sum = 0;
      start = null;
      end = null;
    }

    AverageAccumulator(String city, LocalDate start, LocalDate end, long count, double sum) {
      this.city = city;
      this.count = count;
      this.sum = sum;
      this.start = start;
      this.end = end;
    }

    public AverageAccumulator add(String city, LocalDate eventDate, double value) {
      //make sure our dataflow is correct and we are summing data from the same city
      if (!this.city.equals("") && !this.city.equals(city)) {
        throw new IllegalArgumentException(city + " does not match " + this.city);
      }

      return new AverageAccumulator(
          city,
          earliest(this.start, eventDate),
          latest(this.end, eventDate),
          this.count + 1,
          this.sum + value);
    }

    public AverageAccumulator merge(AverageAccumulator that) {
      LocalDate mergedStart = earliest(this.start, that.start);
      LocalDate mergedEnd = latest(this.end, that.end);
      return new AverageAccumulator(
          this.city, mergedStart, mergedEnd, this.count + that.count, this.sum + that.sum);
    }

    private LocalDate earliest(LocalDate d1, LocalDate d2) {
      if (d1 == null) {
        return d2;
      } else if (d2 == null) {
        return d1;
      } else {
        return d1.isBefore(d2) ? d1 : d2;
      }
    }

    private LocalDate latest(LocalDate d1, LocalDate d2) {
      if (d1 == null) {
        return d2;
      } else if (d2 == null) {
        return d1;
      } else {
        return d1.isAfter(d2) ? d1 : d2;
      }
    }

    public double average() {
      return sum / count;
    }

    public String getCity() {
      return city;
    }

    public LocalDate getStart() {
      return start;
    }

    public LocalDate getEnd() {
      return end;
    }
  }
}

问题:

我面临的问题是,有时我没有得到我所期望的所有窗口。这并不总是发生,有时连续运行输出不同的结果,所以我怀疑有一些竞争条件。

例如,在其中一个分区文件输出中,我有时会得到:

(2021-07-12,2021-07-14,Belgrade,56.666666666666664)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)

虽然有时我得到了完整的一套:

(2021-06-28,2021-06-30,Belgrade,48.666666666666664)
(2021-07-01,2021-07-07,Belgrade,41.142857142857146)
(2021-07-08,2021-07-14,Belgrade,52.857142857142854)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)

我的数据流管道中是否有任何明显的错误?不知道为什么会发生这种情况。它也不总是发生在同一个城市。

可能发生了什么?

更新

因此,当我禁用水印时,问题似乎不再发生。我将水印策略更改为以下内容:

    final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
            WatermarkStrategy.<CityMetric>noWatermarks()  
                             .withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());

到目前为止,我得到了一致的结果。当我检查留档时,它说:

静态水印策略 no水印()

创建完全不生成水印的水印策略。这在执行纯基于处理时间的流处理的方案中可能很有用。

但我不是在做基于时间的流处理,而是在做事件时间处理。

为什么< code > forMonotonousTimestamps()会出现我看到的奇怪行为?事实上,我的时间戳是单调递增的(否则< code>noWatermarks策略将不起作用),但不知何故,改变这种情况并不适合我的场景。

我在Flink的工作方式有什么缺失吗?

共有1个答案

韩彬
2023-03-14

Flink不支持每键水印。每个并行任务根据对流经该任务的所有事件的观察,独立生成水印。

因此,这不适用于 forMonotonousTimestamps 水印策略的原因是输入实际上不是按时间戳排序的。它在每个城市内按时间排序,但不是全局排序。这将导致某些记录延迟,但不可预测,具体取决于生成水印的确切时间。这些延迟事件被应包含它们的窗口忽略。

您可以通过多种方式解决这个问题:

(1)使用forBoundedOutOfOrness水印策略,其持续时间足以说明数据集中的实际乱序性。鉴于数据如下所示:

03/10/2021,GR,Athens,pressure,60,1017.9,1040.6,1020.9,542.4
28/06/2021,US,Atlanta,co,24,1.4,7.3,2.2,19.05

这将需要大约100天的故障持续时间。

(2)将窗口配置为具有足够的允许延迟。这将导致某些窗口被多次触发——一次是在水印指示它们可以关闭时,另一次是在每次将延迟事件添加到窗口时。

(3)使用< code>noWatermarks策略。这将导致作业仅在到达其输入文件末尾时才产生结果。对于连续的流作业,这是不可行的,但是对于有限(有界)输入,这是可行的。

(4) 在RuntimeExecutionMode下运行作业。批处理模式。然后,作业只会在消耗完所有输入后在最后产生结果。这将使用为批处理工作负载设计的更优化的运行时运行作业,但结果应与(3)相同。

(5)更改输入,使其不会乱序。

 类似资料:
  • 主要内容:1.窗口概述,2.窗口分类,3.细分,4.窗口Api,5.窗口分配器 Window Assigners,6.窗口函数,7.TopN 实例1.窗口概述 Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口

  • 我的目标是杀死Flink任务管理器,然后在Windows上重新启动它。这是刺激Flink中的错误所必需的,这样我就可以看到检查点的工作。我已经设法找到了任务管理器PID通过查找PID对它侦听的端口。所以杀死PID应该杀死taskManager,但是我找不到在windows上重启Flink taskManager的方法,因为taskManager脚本是一个外壳脚本。请帮助我在以下两个问题。 如何在W

  • 一、窗口概念 在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内所有商品的点击量;或者每发生1000次点击后,都去统计一下每个商品点击率的占比。在 Flink 中,我们使用窗口 (Window) 来实现这类功能。按照统计维度的不同,Flink 中的窗口可以

  • 我尝试使用flink sql窗口API:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table_api.html#group-windows

  • 我有一份flink工作,需要在1小时内重复删除收到的记录。重复数据消除后,我需要收集所有这些重复数据消除的文档,并进行一些聚合,如计数,然后生成目标主题。 现在,由于我只需要收集那些重复数据消除的文档,所以可能不需要等待1小时。我如何避免仅为收集这些文档而设置1个小时的窗口,但一旦收集到这些文档,就继续进行聚合。 因此,资源会占用内存,检查点大小也在增加,这是我想要避免的。 水印策略: 如有任何建

  • 这是一个关于flink流的两个问题的主题,基于我自己做的实验,我需要一些澄清。问题是: