当前位置: 首页 > 知识库问答 >
问题:

我们能从历史数据中创建Kafka时间窗口流吗?

潘安邦
2023-03-14

我有一些历史数据,每条记录都有它们的时间戳。我想阅读它们并将它们输入到Kafka主题中,并使用Kafka流以时间窗口的方式处理它们。

现在的问题是,当我创建kafka流时间窗口聚合处理器时,我如何告诉kafka使用记录中的时间戳字段来创建时间窗口,而不是真正的实时时间?

共有1个答案

浦修文
2023-03-14

您需要创建一个自定义的TimestampExtractor,它将从记录本身提取值-文档中有一个例子,这里也有。我还发现了这个要点,看起来很相关。

 类似资料:
  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。

  • 假设我有一个 inputStream,我对它执行了一些窗口操作。通过对事件执行某些窗口操作而创建的事件的时间戳是什么? 现在我想组合流countStream和maxStream,以找到最后一秒的countStream等于maxStream的所有时间戳。 注意:这并不是我试图解决的问题,但这是一个代表性的例子。解决这个问题将帮助我解决我需要解决的真正问题。

  • 我必须从Postgres表中读取配置并广播它,以使用它过滤主数据流。我正在使用Flink广播状态进行此操作。当我从本地套接字获取配置时,它工作得很好。 用例是在Flink作业中从Postgres读取最新配置,而无需重新启动作业。 我们可以从Postgres表创建Flink数据流吗?如果可能的话,它是否有效,因为它将永远保持JDBC连接的活性?

  • 为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?

  • 我们使用流,并将每个消息发布到另一个主题,该主题按用户id对记录进行分区(按用户id重新分区原始流)。 然后我们消耗这个重新分区的流,我们将消耗的记录存储在加窗10分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区中,但顺序并不保证,因为最初的主题有10个分区。 我理解Kafka流的窗口模型,当新记录进入时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需