当前位置: 首页 > 知识库问答 >
问题:

使用Flink和基于事件时间的流计算平均值

彭宏阔
2023-03-14

我想用基于历史事件的流计算Flink中基于窗口的平均值(或我定义的任何其他函数),因此流必须是事件时间(而不是基于处理时间):

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

我已经了解了如何在摄入时添加时间戳:

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)

但是当我进行计算(应用函数)时,当我只是以与没有EventTime时相同的方式进行计算时,它就不起作用了。我读过一些关于我必须设置的水印的东西:

val avg = stream
  .keyBy("instrument")
  .timeWindow(Time.seconds(10))
  .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
    val avg = values.map(_.val).sum / values.size
    val dp = Datapoint(key.getField[String](0), avg)
    out.collect(dp)
  })

avg.print()
env.execute()

有没有人举一个简单的Scala例子?

尊敬的安德烈亚斯

共有1个答案

汲丰茂
2023-03-14

水印是一种有效的时间戳,它断言具有较早时间戳的所有事件(可能)已经到达。基于事件时间的窗口依赖于水印来知道窗口何时完成。到目前为止,最常见的水印策略是假设事件到达时有一定的延迟。

如果要在数据源中发出水印(在摄取期间),请参阅带有时间戳和水印的源函数,但它将像

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime))

另一方面,如果要在源代码之外处理此问题,请参阅时间戳赋值器/水印生成器和允许固定延迟量的赋值器。您可以简单地执行以下操作:

stream
  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))( _.getTimestamp ))
  .keyBy("instrument")
  ...

我链接到的文档中有更详细的Scala示例。

 类似资料:
  • Flink(批处理/流式处理)中是否有方法同时计算字段的平均值和总和?使用聚合方法,我可以计算groupBy结果中字段的和,但如何同时计算平均值呢?下面的示例代码。

  • 我需要有关构建SQL查询的帮助: 这是我用来存储测试运行统计信息的postgres表。 此表包含测试开始时间、结束时间和状态。我需要计算由于测试失败而使用的时间间隔。即测试失败和下一个立即测试开始之间的时间间隔。 即对于每个测试失败的记录,获取end_date并获取同一测试的下一个即时记录的start_date。计算时间差。将所有此类失败记录的持续时间相加,并按失败次数计算。以获得平均值。 例子:

  • 问题内容: 我有一条流经多个系统的消息,每个系统都会记录消息的进入和退出以及时间戳和uuid messageId。我通过以下方式提取所有日志: 结果,我现在有以下事件: 我想生成一个报告(最好是堆积的条或列),用于每个系统的时间: 做这个的最好方式是什么?Logstash过滤器?kibana计算字段? 问题答案: 您只能使用Logstash 过滤器来实现此目的,但是,您必须实质性地重新实现该过滤器

  • 我有一些能量计,将继续产生计数器值,这是一个累积指标。即不断增加,直到计数器复位。 有一个实时ETL作业,它在事件时间的两个连续值之间进行减法。 例如。 此外,有时事件可能没有按顺序接收。 如何使用Apache Flink流式API实现?最好使用Java中的示例。

  • 问题内容: 编辑:我已经写了平均的代码,但我不知道如何使它也使用从我的args.length而不是数组的整数 我需要编写一个Java程序,该程序可以计算:1.读入的整数数2.平均值-不必是整数! 注意!我不想从数组中计算平均值,但是要在args中计算整数。 目前我已经写了这个: 谁能指导我正确的方向?还是举个例子,以书面形式指导我塑造这段代码? 提前致谢 问题答案: 只需对您的代码进行一些小的修改

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想