当前位置: 首页 > 知识库问答 >
问题:

如何在flink中通过同一插槽上的某些键聚合日期,以便保存网络呼叫

奚才良
2023-03-14

我的flink作业到现在为止对客户端id执行KeyBy操作,并使用窗口操作符累积1分钟的数据,然后聚合数据。聚合之后,我们将这些累积的数据存储在hdfs文件中。唯一密钥(客户端id)的数量每天超过7000万。

问题是,当我们做keyBy时,它会在集群上分发数据(我的假设),但我希望数据在同一个插槽(或节点)上聚集1分钟,用于传入事件。

注意:在接收器中,我们可以在1分钟窗口内为同一客户端提供多个数据。我要保存网络电话。

共有1个答案

寿浩言
2023-03-14

您是对的,当数据被分区/分布时,执行stream.keyby()会导致网络流量(当然,假设您有并行性>1)。但是标准的窗口操作符需要一个键控流。

您可以创建一个实现CheckPointedFunction接口的ProcessFunction,并使用它来维护非键控流中的状态。但是您仍然必须实现自己的计时器(标准的Flink计时器需要一个键控流),并将时间窗口保存为状态的一部分。

您可以编写自己的自定义RichFlatMapFunction,并在内存中对预加键的聚合执行Mapdo。您仍然需要使用keyby()和窗口操作来执行聚合,但网络流量会少得多。

我想这是无国籍的也可以。但您可能需要将其设置为LRU缓存,以避免内存耗尽。你需要创建自己的定时器来冲洗窗户。

但金科玉律是先测量,再优化。如确认网络流量确实是一个问题,然后执行直升机特技以尝试减少它。

 类似资料:
  • 问题内容: 我有一个数据集,其中包含对频率为2分钟的几周的观察。我想将时间间隔从2分钟增加到5分钟。问题在于,观察的频率并不总是相同的。我的意思是,从理论上讲,每10分钟应进行5次观察,但通常情况并非如此。请让我知道如何根据平均功能以及观察的时间和日期汇总观察。换句话说,基于每5分钟的汇总,而对于每5分钟的时间间隔,观察次数却不同。此外,我有时间戳格式的日期和时间。 示例数据: 预期成绩: 问题答

  • 我创建了一个。ctl文件,以便在Oracle11g中插入一些数据。在输入文件中,有一个名为LAST_DATE的列,其格式为(MM/DD/YYYY),在oracle中也有相同的列,其日期为datatype。但是当我试图插入数据时,它的格式会自动从(mm/dd/yyyy)或(04/25/2016)更改为(dd/mm/yyyy)或(25-04-2016)。我怎样才能解决这个问题。我还在我的ctl文件中定

  • 我有一个用例,其中我收到包含不同信息集的事件流,并希望对它们执行聚合。对于这些聚合中的每一个,都需要多个翻滚窗口,例如:每日,每周,每月,每年等。 聚合最初是所看到的计数的基本加法,但后来可能是对这些事件的一些分析/联接处理。因此,如果一个事件A每天来一次,另一个事件B每周来一次,结果将是这样的: 用例只是围绕翻滚的窗口而不是滑动窗口,我正在研究如何实现这个用例。主要问题是我不想等到窗口结束,而是

  • 我正在遵循这个关于通过java API创建YarnApp的示例。 https://github.com/hortonworks/simple-yarn-app 工作正常,但日志只存在于执行中,之后日志就消失了。 我怎么能通过代码捕捉到这个?或者启用一个选项?

  • 我对流/事件处理有些陌生,但我遇到了以下问题。 我正在处理来自Kafka的发票事件,有一个事件“时间戳”以及一个“Scheduledat”日期(时间戳),并希望对发票“总计”执行每日聚合。在传统DB中,我会执行以下操作: 然而,当在流上下文中考虑这一点时,我尝试使用'eventTime'(取自事件'timestamp')和1天窗口。问题是我真的很想使用'scheduled at‘时间戳,然而,它是

  • 问题内容: 此查询将保存完整的日期和时间。但我只想保存时间而不是数据库中的日期。有任何查询可以做到这一点吗? 问题答案: 您的列必须设置为DATETIME或TIMESTAMP。 如果使用TIME类型,则查询将按预期工作。 如果您使用任何其他类型的列,则可以使用其他答案提到的CURTIME()方法或CAST(column AS TIME),但是这会占用更多的磁盘空间,并且如果使用select会导致查