当前位置: 首页 > 知识库问答 >
问题:

Kafka流-如何为KTable设置新密钥

晏正豪
2023-03-14

我是Kafka Streams的新手,我正在使用1.0.0版。我想从其中一个值为KTable设置一个新密钥。

在使用KStream时,可以使用如下方法selectKey()完成。

kstream.selectKey ((k,v) -> v.newKey)

然而,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了KTable设计的关键?

共有3个答案

邴和雅
2023-03-14

有一种方法可以从流中提取密钥并直接转换为KTable:

       KTable<String, User> userTable = builder
            .stream("topic_name", Consumed.with(userIdSerde, userSerde))
            .selectKey((key, value) -> key.getUserId())             
            .toTable( Materialized.with(stringIdSerde, userSerde));

详情可以在这里找到

程阳平
2023-03-14

@马蒂亚斯的回答让我走上了正确的道路,但我认为有一段代码样本可能会有所帮助

final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
        // First, going to set the new key to the user's application id
        (userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
        // Initiate the aggregate value
        () -> null,
        // adder (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user,
        // subtractor (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user
);

KGroupedTable aggregate()文档:https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-组织。阿帕奇。Kafka。溪流。kstream。初始化器组织。阿帕奇。Kafka。溪流。kstream。聚合器组织。阿帕奇。Kafka。溪流。kstream。聚合器组织。阿帕奇。Kafka。溪流。kstream。具体化-

谷梁承宣
2023-03-14

如果要设置新密钥,需要对KTable进行重新分组:

KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);

因为一个键对于一个KTable(与KStream相反)必须是唯一的,所以需要指定一个聚合函数,将具有相同(新)键的所有记录聚合为一个值。

自Kafka2.5以来,Kafka流还支持KStream#toTable()操作符。因此,也可以执行表格。toStream()。选择键(…)。toTable()。这两种方法都有优点和缺点。

使用toTable()的主要缺点是,它会根据新键重新分区输入数据,这会导致交错写入重新分区主题,从而导致无序数据。虽然第一种方法通过group By()使用相同的实现,但使用聚合函数有助于解决“冲突”的明确性。如果使用toTable()运算符,则会完成基于重新分区主题偏移顺序的“盲”上插(这实际上类似于其他答案中的代码示例)。

例子:

Key | Value
 A  | (a,1)
 B  | (a,2)

如果您在a上重新输入,您的输出表将是两个表中的一个(但它没有定义为一个):

Key | Value          Key | Value
 a  | 1               a  |  2

“重新设置”表的操作在语义上总是定义不清的。

 类似资料:
  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里

  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 使用Kafka流DSL是否可行?所有正在使用的主题都是,因此我希望模拟一个表,并且永远不要摆脱旧的值。 TL;DR;如何将一条消息转换成多条消息?