当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中处理以日期时间为中心的非EventTime字段的窗口化/聚合?

汪明德
2023-03-14

我对流/事件处理有些陌生,但我遇到了以下问题。

我正在处理来自Kafka的发票事件,有一个事件“时间戳”以及一个“Scheduledat”日期(时间戳),并希望对发票“总计”执行每日聚合。在传统DB中,我会执行以下操作:

SELECT AVG(total)
FROM "Invoice"
WHERE date_trunc('day', "scheduledAt") = date_trunc('day', CURRENT_TIMESTAMP)

然而,当在流上下文中考虑这一点时,我尝试使用'eventTime'(取自事件'timestamp')和1天窗口。问题是我真的很想使用'scheduled at‘时间戳,然而,它是一个字段,可以在很远的将来改变,从事件发生的时候,甚至可能回到过去。

目前,我很难协调如何使用流/窗口来实现类似的功能,特别是当'scheduled At‘datetime可以在时间上向前和向后改变时。

在Flink的模式或技术上有什么建议,我可以遵循以这种方式实现每日平均吗?

共有1个答案

翁硕
2023-03-14

这不是流式处理最明显的用例,但我将提供一个如何使其工作的草图。

假设有必要容纳对scheduledat字段的所有方式的更改,那么就有必要保持足够的状态,以便根据第一原则重新计算所有内容。

这里有一种使用Flink的方法。用于创建发票以及重新安排发票(以及任何类型的更新)的事件流。假设所有的汇总都是每天进行的,那么通过date_trunc('day',“scheduledat”)等量对流进行键控,以便将同一天的所有发票汇总在一起。

您可以使用windows或ProcessFunction来执行聚合,但我假设使用ProcessFunction。您可以使用托管、键控状态保存每一天的所有发票,并在新信息到达时发出当天发票的连续更新报告流,或者使用计时器以适当的时间间隔发出报告。

根据正在计算的合计数,可能没有必要将所有发票保持在Flink状态。您可以简单地存储聚合值,并在新发票(和发票更新)到达时更新这些聚合值。例如,如果只需要报告每天的平均数和合计数,则可以在valueStatereducingstate中保留一个计数器和一个正在运行的合计数,然后根据这些值计算平均数。但如果您还需要报告最大发票,那么您将不得不将它们全部存储起来。

 类似资料:
  • 我是新的apache flink,并试图了解事件时间和窗口的概念是如何处理的flink。 下面是我的设想: > 我有一个程序,它以线程的形式运行,每秒创建一个包含3个字段的文件,其中第3个字段是时间戳。 虽然每隔5秒我会在创建的新文件中输入一个旧的时间戳(可以说是t-5),但还是有一些调整。 现在,我运行流处理作业,将上面的3个字段读入一个元组。 现在,我定义了以下用于水印和时间戳生成的代码: 然

  • 我有一些问题。 基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。 UserData类有一个时间戳变量。 起初我试着用一个翻滚的窗户。 但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。 如何处理流上窗口UserData类的时间戳基? 谢谢。 附加信息 我使用这样的代码。 我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。

  • 我正在学习如何使用Flink处理流数据。 根据我的理解,我可以多次使用函数进行各种转换。 表示数据源不断向Flink发送字符串。所有字符串都是JSON格式的数据,如下所示: 下面是我的代码: 正如您所看到的,我的示例非常简单:获取并反序列化数据-->将string转换为Json对象-->将Json对象转换为string并获取所需内容(这里只需要)。 就目前而言,似乎一切都很好。我确实从日志文件中获

  • 问题内容: 我将Excel文件导入Access 2010,日期字段(CALLDATE)以文本(YYYYMMDD)的形式出现。我想使用更新查询来更新新字段“ dateofcall”,但要使用日期/时间格式。我尝试使用: 我以为它会这么简单,但是它以日期格式显示为空白。我也尝试使用DateSerriel(),但仍然出现错误。有什么建议? 问题答案: 您可以使用左,右和中间字符串函数从字符串的各个部分构