当前位置: 首页 > 知识库问答 >
问题:

使用KStream语义重新组合

朱硕
2023-03-14

使用 kafka 流,我想按某个键 K1 对元素 E 的流 S 进行分组,同时将同一键的所有值聚合到一个连接的结果 AGG 中。这会产生 KTable T1

根据聚合结果,该值应重新分区到另一个 KTable T2 中,该 T2 按从聚合结果 AGG 中获取的键 K2 分组。因此,聚合结果应为下一次重新分组生成密钥。

最后,我只对一个KTableT2感兴趣,其中键是K2,值是AGG

但是,这不起作用。我只得到最后一个值的 KTable T。不是每个键 K2 的值

我知道聚合结果只会在一段时间后转发,所以我已经尝试降低commit.interval。ms设置为1,但无效。

我还尝试使用< code>through并将中间结果写入一个流中,但也没有成功。

val finalTable = streamBuilder.kstream("streamS")
                    .groupBy{ k, v -> createKey1(k, v) }
                    .aggregate(
                            { Agg.empty() },
                            { k, v, previousAgg ->
                                Agg.merge(previousAgg, v)
                            })
                    .toStream()
//                    .through("table1")
                    .groupBy { k1, agg -> agg.createKey2()}
                    .reduce{ _, agg -> agg }

对于包含以下值的流S
key1=“123”,id=“1”,startNewGroup=“false”
key1=“234”,id=“2”,startNew Group=“false”
>key1=”123“,id=”3“,startNewGroup=”false“
key1=”123“”,id=”4“,startNew Group=”true“
>key1=”234“,id=“5”,startNewGroup=“false”
key1=“123”,id=“6”,startNewGroup=“false”
key1=“123”,id=“7”,startNewGroup:“false”
key1=“23”,id=“8”,startNew Group=“true”

我希望最终结果是一个具有以下最新键值的Ktable:
key: 123-1,value:'key 1="123",key 2="123-1",id="1,3"
key: 234-2,value:'key 1="234",key 2="234-2",id="2,5"
key: 123-4,value:'key 1="123",key 2="123-4",id="4,6,7"'
key: 123-8,value:'key 1="123",key 2="123-8",id="8"'

元素的原始流S首先由key1进行分组,其中聚合结果包含groupby keykey 1,并添加一个额外的字段key2(包含key 1与第一次出现的id的组合)
每当聚合收到startNewGroup设置为true的值时,它都会返回一个聚合,其中key2字段设置为新值的key1id,从而有效地创建了一个新的子组
在第二次重新分组中,我们通过key2字段进行简单分组。

然而,我们真正观察到的是:
键:234-2,值:'key1=“234”,key2=“234-2”,ids=“2,5”'
键=123-8,值:'key1=“123”,key1=“123-8”,id=“8”'

共有1个答案

上官思博
2023-03-14

对于您的用例,最好使用处理器 API。处理器API可以很容易地与Kafka Streams DSL(处理器API集成)结合使用。

您必须实现CustomTransformer,它将为状态存储中的特定密钥聚合消息。当<code>startNewGroup=true</code>消息到达时,密钥的旧消息将转发到下游,新聚合将开始

您示例转换器可能如下所示:

import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore

case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {

  private var stateStore: KeyValueStore[String, Agg] = null
  private var context: ProcessorContext = null

  override def init(context: ProcessorContext): Unit = {
    this.context = context
    stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
  }

  override def transform(key: String, value: Value): Agg = {
    val maybeAgg = Option(stateStore.get(key))

    if (value.startNewGroup) {
      maybeAgg.foreach(context.forward(key, _))
      stateStore.put(key, Agg(value))
    }
    else
      stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
    null
  }

  override def close(): Unit = {}
}
 类似资料:
  • 我正在尝试构建以下拓扑: > 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。 在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示: 请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从201

  • 我们正在尝试实现下面描述的用例,我们有我们希望克服的实现问题, 用例, 我们试图通过匹配两个流的消息中的键(JSON)来实现两个Kafka主题之间的KStream连接。此外,我们还应该维护消息序列,因为它是从源代码到达KStream的。 场景是,如果匹配的键还没有到达任何一个流,我们应该停止或重试join,直到预期的键到达其他主题。我们想把不匹配的记录放回KStream,但在这种情况下,序列没有保

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所

  • 本文向大家介绍易语言重定义数组命令使用讲解,包括了易语言重定义数组命令使用讲解的使用技巧和注意事项,需要的朋友参考一下 重定义数组命令 英文命令:ReDim 所属类别:数组操作 本命令可以重新定义指定数组的维数及各维的上限值。 语法:  无返回值  重定义数组(欲重定义的数组变量,是否保留以前的内容,数组对应维的上限值,… ) 例程: 说明: 重新定义数组的各项属性。 注: 如果需要保留以前内容需

  • 无法使用ByteBuddy重新定义java.io.ObjectInputStream。 我已经尝试了很多方法来解决这个问题。但我找不到钩住“java.io.ObjectInputStream$resolveClass”的方法。你能帮助我吗?

  • 我正在发送JSON消息,其中包含有关web服务请求和对Kafka主题的响应的详细信息。我想在每条消息到达Kafka时使用Kafka流进行处理,并将结果作为持续更新的摘要(JSON消息)发送到客户端连接的websocket。 然后,客户端将解析JSON,并在网页上显示各种计数/摘要。 示例输入消息如下所示 随着信息流进入Kafka,我希望能够实时计算 ** 请求总数,即所有请求的计数 invocat