当前位置: 首页 > 知识库问答 >
问题:

Kafka流窗口是如何工作的?

严兴言
2023-03-14

我很难理解窗口在Kafka Streams中是如何工作的。到目前为止,结果似乎与我所阅读和理解的不一致。

我已经创建了一个带有支持主题的KSQL流。KSQL SELECT语句中的“列”之一已被指定为该主题的TIMESTAMP。

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

my-stream主题中的记录按键(PARTITION_KEY)分组,并用跳转窗口窗口

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

记录通过

val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

然后我通过

dataAgg.print()
topicStats.print()

组中的第一个窗口转换为7:00-7:05

当我通过控制台消费者检查流主题中的记录时,我看到有两条记录应该在上面的窗口中。然而,聚合器只提取其中的一个。

我认为dataAgg窗口KTable将包含1条分组键的记录,但聚合将使用2条记录来计算聚合。打印的聚合值不正确。

我错过了什么?

共有1个答案

温举
2023-03-14

KSQL可以在写入时设置记录时间戳,但是您需要在创建输入流时指定时间戳,而不是在定义输出流时。即,为输入流指定的时间戳将用于在写入时设置记录元数据字段。

这种行为相当不直观,我为此问题开了一张罚单:https://github.com/confluentinc/ksql/issues/1367

因此,在为问题中显示的查询创建输入流时,您需要指定with(TIMESTAMP='recorded_timestamp')子句。如果这是不可能的,因为您的查询需要在不同的时间戳上运行,您需要指定将数据复制到新主题中的第二个查询。

CREATE STREAM my_stream_with_ts
    WITH (KAFKA_topic='my-stream-topic-with-ts')
AS select * from my_stream PARTITION BY PARTITION_KEY;

作为替代方案,您可以为您的Kafka Streams应用程序设置自定义时间戳提取器,以从有效负载中提取时间戳。

 类似资料:
  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 下面给出的kafka producer程序不是通过Eclipse在Windows中运行的,而是在Unix平台上运行的(即,当我在承载kafka代理的Unix中运行它时,它工作正常)。windows不支持Kafka制作人吗?但是,我可以从windows计算机ping ip地址。请帮忙。 这是我得到的异常错误。 log4j:WARN找不到记录器的附加程序(kafka.utils.VerifiableP

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的

  • 主要内容:发布订阅消息传递的工作流,队列消息/消费者组的工作流,ZooKeeper的角色截至目前,我们已经了解了Kafka的核心概念。 现在让我们来看看Kafka的工作流程。 Kafka只是分成一个或多个分区的主题集合。 Kafka分区是消息的线性排序序列,每个消息由其索引标识(称为偏移量)。 Kafka集群中的所有数据都是不相关的分区联合。 传入消息写在分区的末尾,消费者依次读取消息。 通过将消息复制到不同的经纪人来提供持久性。 Kafka以快速,可靠,持久的容错和零停机方式提供基

  • 我们正在使用kafka streams的windows join连接2个流,我们想知道: 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗 [更新] 例如,我们创建JoinWindow如下: 虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用 这是刚刚在我的机器上的一个空代理(使用confluent