当前位置: 首页 > 知识库问答 >
问题:

Kafka流加窗口的关键是人类可读

姜羽
2023-03-14

我正在对kafka流进行窗口聚合。它工作正常并进行正确的聚合。这是scala中的代码。CallRecord是一个案例类。

    builder
  .stream[String, String](input_topic)
  .mapValues((elm) => {
    parse(elm).extract[CallRecord]
  })
  .groupBy((key, value) => {
    value.agentId
  })
  .windowedBy(every15Minute)
  .aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
    CallRecordAggByAgent(
      callRecord.agentId,
      ((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
      ((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
      ((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
      aggregator.count + 1
    )
  })
  .mapValues((elm) => {
    write(elm)
  })
  .toStream
  .to(output_topic)

当我试图从KSQLDB中读取这个内容时,当我创建一个关于这个主题的流时,我看到rowkey的值如下3w�H�@ 我知道这是反序列化问题,但我希望能够在KSQL中直接反序列化,或者在流式传输到输出主题时使其长度达到毫秒。我的理解是这应该很容易实现,但我想我错过了一些细微差别。


共有1个答案

沃阳飙
2023-03-14

我给出的解决方案如下。显然这并不难。

import io.circe.generic.auto._
import org.json4s._
import org.json4s.native.Serialization.write

builder
      .stream[String, String](args.INPUT_TOPIC)
      .mapValues((elm) => {
        parse(elm).extract[CallRecord]
      })
      .groupBy((key, value) => {
        value.agentId
      })
      .windowedBy(every15Minute)
      .aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
        CallRecordAggByAgent(
          callRecord.agentId,
          ((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
          ((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
          ((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
          aggregator.count + 1
        )
      })
      .mapValues((elm) => {
        write(elm)
      })
      .toStream
      .selectKey((k, v) => {
        s"${k.key()}@${k.window().startTime().toEpochMilli.toString}"
      })
      .to(args.OUTPUT_TOPIC)

selectKey提供了更改分组键的可能性,因此在流式传输到输出主题之前,我从键中提取时间戳,并将其设置为字符串。

 类似资料:
  • 我们正在使用kafka streams的windows join连接2个流,我们想知道: 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗 [更新] 例如,我们创建JoinWindow如下: 虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用 这是刚刚在我的机器上的一个空代理(使用confluent

  • 我很难理解窗口在Kafka Streams中是如何工作的。到目前为止,结果似乎与我所阅读和理解的不一致。 我已经创建了一个带有支持主题的KSQL流。KSQL SELECT语句中的“列”之一已被指定为该主题的TIMESTAMP。 my-stream主题中的记录按键(PARTITION_KEY)分组,并用跳转窗口窗口 记录通过 然后我通过 组中的第一个窗口转换为7:00-7:05 当我通过控制台消费者

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我正在窗口流上执行聚合,希望抑制早期聚合结果。我所说的早期结果是指在窗口结束前计算的结果,而不是那些在宽限期内发生的结果。因此,我想用时间戳抑制所有聚合结果 最小Kafka流拓扑示例: 因此,不是我的选择,因为我必须等到宽限期到期,这可能会很长。 根据KIP-328,使用

  • 假设我有三个Kafka主题,其中充满了表示不同聚合中发生的业务事件的事件(事件源应用程序)。这些事件允许构建具有以下属性的聚合: 用户:usedId,名称 应用程序的模块:moduleId,name 授予用户应用程序模块:grantId、userId、moduleId、作用域 现在,我想创建一个包含所有授权的流,其中包含用户和产品的名称(而不是id)。我想这么做: 通过按用户ID对事件进行分组,为