当前位置: 首页 > 知识库问答 >
问题:

Flink按字段id对记录进行分组的最佳方法

陈毅
2023-03-14

我正在和Kafka经纪人联系阿帕奇·Flink。

我随机收到了以下消息:

  • 消息(时间戳=[…],索引=1,someData=[…])
  • 消息(时间戳=[…],索引=2,someData=[…])
  • 消息(时间戳=[…],索引=3,某些数据=[…])
  • 消息(时间戳=[…],索引=2,someData=[…])
  • 消息(时间戳=[…],索引=3,某些数据=[…])
  • 消息(时间戳=[…],索引=1,someData=[…])

我来自Kafka的记录有一个index字段。

在我的应用程序中,我需要计算具有相同id的最后两条记录,然后立即发送响应。

例如,这两个:

  • 消息(时间戳=[…],索引=1,someData=[…])

存储和计算具有相同索引字段的最后两条记录的最佳方法是什么?你能告诉我一些技巧吗?

共有1个答案

裴经义
2023-03-14

您的需求并不完全清楚,但您可能想了解的机制是使用keyBy(e-

如果您需要考虑时间戳,并且事件流是无序的,即使在索引的单个值内也是如此,那么您需要首先按时间戳对流进行排序。在这种情况下,如果您使用FlinkSQL进行排序,您将会有更轻松的时间,然后您可以使用match_recognize来进行模式识别,尽管对于这样一个简单的模式来说,这可能有点过分了。下面是一个如何进行排序的示例。

 类似资料:
  • 我有以下模式- [名称:StringType,Grades:ArrayType(StructType(StructField(subject_grades),ArrayType(StructType(StructField(subject,StringType,false)),StructField(grade,LongType,false) 我想在数组中的subject字段上,该数组位于grad

  • 我需要创建一个额外的XML标记,比如myGroup&因为我需要创建来自输入的每个ID_Number的组。 以下是我的输入: 我想根据ID_Numbers创建组,以便所有具有公共ID的mySegments都位于myGroup标记下,如下所示: 我知道使用XSLT可以很容易地完成,但我必须使用Groovy来实现这一点。请告知这是否可以由Groovy完成。我尝试了以下链接,但不确定他们如何可以应用到我的

  • 问题内容: 我有一个这样的表: 我需要一个查询以n分钟的间隔对记录进行分组。 例如,输出(按60分钟分组): 到目前为止,由于间隔介于0到60分钟之间,因此我一直在使用此查询按10分钟间隔进行分组: 但是现在间隔可以是例如 125436758 分钟。 我无法创建新表,并且用户定义的间隔必须为n分钟。我正在使用SQL SERVER2012。谢谢。 问题答案:

  • 问题内容: 我的索引具有如下所述的数据。 如何在Java中编写Elasticsearch查询。如果我按日期搜索2016-13-01T12:00:00我希望看到每个groupId的最新版本,其indexDate小于或等于搜索日期? 预期输出: 我没有在Elasticsearch的日期字段中看到max函数来实现这一点。 问题答案: 我将首先进行汇总,然后使用子汇总,以降序排序并返回该存储区的第一个文档

  • 问题内容: 这是我想做的: 使搜索主题与表格中的多个字段匹配 根据字段的重要性和匹配的相关性对结果进行排序(按该顺序) 例如:假设我有一个博客。然后有人搜索“ php”。结果将显示为: 首先,针对“标题”字段的匹配,按相关性排序 然后,根据相关性对“ body”字段进行匹配 以此类推,以指定的字段… 我实际上是用PHP中的类完成此操作的,但是它使用了大量的UNIONS(很多!),并且随着搜索主题的

  • 如何用条件(type=“block”)对所有记录进行分组? 我已尝试:db.itr.aggregate({$match:{“UIN”:“1396599472869”}},{$project:{“VM”:1}},{$group:{_id:null,r1:{$push:“$VM”}}},{$unwind:“$R1”},{$group:{_id:null,r2:{$push:“$R1”}},{$unwi