使用 kafka 流,我想按某个键 K1
对元素 E
的流 S
进行分组,同时将同一键的所有值聚合到一个连接的结果 AGG
中。这会产生 KTable T1
。
根据聚合结果,该值应重新分区到另一个 KTable T2 中,该 T2
按从聚合结果 AGG
中获取的键 K2
分组。因此,聚合结果应为下一次重新分组生成密钥。
最后,我只对一个KTableT2
感兴趣,其中键是K2
,值是AGG
但是,这不起作用。我只得到最后一个值的 KTable T
。不是每个键 K2
的值
我知道聚合结果只会在一段时间后转发,所以我已经尝试降低commit.interval。ms
设置为1,但无效。
我还尝试使用< code>through并将中间结果写入一个流中,但也没有成功。
val finalTable = streamBuilder.kstream("streamS")
.groupBy{ k, v -> createKey1(k, v) }
.aggregate(
{ Agg.empty() },
{ k, v, previousAgg ->
Agg.merge(previousAgg, v)
})
.toStream()
// .through("table1")
.groupBy { k1, agg -> agg.createKey2()}
.reduce{ _, agg -> agg }
对于包含以下值的流S
:key1=“123”,id=“1”,startNewGroup=“false”
key1=“234”,id=“2”,startNew Group=“false”
>key1=”123“,id=”3“,startNewGroup=”false“
key1=”123“”,id=”4“,startNew Group=”true“
>key1=”234“,id=“5”,startNewGroup=“false”
key1=“123”,id=“6”,startNewGroup=“false”
key1=“123”,id=“7”,startNewGroup:“false”
key1=“23”,id=“8”,startNew Group=“true”
我希望最终结果是一个具有以下最新键值的Ktable:key: 123-1,value:'key 1="123",key 2="123-1",id="1,3"
key: 234-2,value:'key 1="234",key 2="234-2",id="2,5"
key: 123-4,value:'key 1="123",key 2="123-4",id="4,6,7"'
key: 123-8,value:'key 1="123",key 2="123-8",id="8"'
元素的原始流S
首先由key1
进行分组,其中聚合结果包含groupby keykey 1
,并添加一个额外的字段key2
(包含key 1
与第一次出现的id
的组合)
每当聚合收到startNewGroup
设置为true
的值时,它都会返回一个聚合,其中key2
字段设置为新值的key1
和id
,从而有效地创建了一个新的子组
在第二次重新分组中,我们通过key2
字段进行简单分组。
然而,我们真正观察到的是:键:234-2,值:'key1=“234”,key2=“234-2”,ids=“2,5”'
键=123-8,值:'key1=“123”,key1=“123-8”,id=“8”'
对于您的用例,最好使用处理器 API。处理器API可以很容易地与Kafka Streams DSL(处理器API集成)结合使用。
您必须实现CustomTransformer,它将为状态存储中的特定密钥聚合消息。当<code>startNewGroup=true</code>消息到达时,密钥的旧消息将转发到下游,新聚合将开始
您示例转换器可能如下所示:
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {
private var stateStore: KeyValueStore[String, Agg] = null
private var context: ProcessorContext = null
override def init(context: ProcessorContext): Unit = {
this.context = context
stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
}
override def transform(key: String, value: Value): Agg = {
val maybeAgg = Option(stateStore.get(key))
if (value.startNewGroup) {
maybeAgg.foreach(context.forward(key, _))
stateStore.put(key, Agg(value))
}
else
stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
null
}
override def close(): Unit = {}
}
我正在尝试构建以下拓扑: > 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。 在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示: 请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从201
我们正在尝试实现下面描述的用例,我们有我们希望克服的实现问题, 用例, 我们试图通过匹配两个流的消息中的键(JSON)来实现两个Kafka主题之间的KStream连接。此外,我们还应该维护消息序列,因为它是从源代码到达KStream的。 场景是,如果匹配的键还没有到达任何一个流,我们应该停止或重试join,直到预期的键到达其他主题。我们想把不匹配的记录放回KStream,但在这种情况下,序列没有保
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所
本文向大家介绍易语言重定义数组命令使用讲解,包括了易语言重定义数组命令使用讲解的使用技巧和注意事项,需要的朋友参考一下 重定义数组命令 英文命令:ReDim 所属类别:数组操作 本命令可以重新定义指定数组的维数及各维的上限值。 语法: 无返回值 重定义数组(欲重定义的数组变量,是否保留以前的内容,数组对应维的上限值,… ) 例程: 说明: 重新定义数组的各项属性。 注: 如果需要保留以前内容需
无法使用ByteBuddy重新定义java.io.ObjectInputStream。 我已经尝试了很多方法来解决这个问题。但我找不到钩住“java.io.ObjectInputStream$resolveClass”的方法。你能帮助我吗?
我正在发送JSON消息,其中包含有关web服务请求和对Kafka主题的响应的详细信息。我想在每条消息到达Kafka时使用Kafka流进行处理,并将结果作为持续更新的摘要(JSON消息)发送到客户端连接的websocket。 然后,客户端将解析JSON,并在网页上显示各种计数/摘要。 示例输入消息如下所示 随着信息流进入Kafka,我希望能够实时计算 ** 请求总数,即所有请求的计数 invocat