当前位置: 首页 > 知识库问答 >
问题:

KTable值字段上的Kafka流分组

龙智
2023-03-14

我有一个用例,我的KTable是这样的。

KTable:orderTable

键:值

KTable:OrderByIdTable=>此表将位于groupBy值字段(id)上,且计数列值将具有ID1=(12+5)ID2=(10+11)

键:值

{id1}:{17}

{id2}:{21}

         final KTable<String, Order> orderTable = builder.table("order-topic");
         Don't know how to do this further.....
         final KTable<String,Long> orderByIdTable = ?

共有1个答案

芮意
2023-03-14

下面是一个代码示例(仅使用Java基元类型,这使我更快地组合在一起),演示了如何对KTable进行重键和重分区,从而生成新的KTable。您应该能够很容易地使其适应将ktable 转换为ktable 的示例。

就我个人而言,我会为您的用例选择Variant 2。

下面的例子。没有经过充分测试,可能是tombstone记录(带有非null键但值为null的消息,表示应该从表中删除该键)没有得到正确处理。

final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, String> table = builder.table(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()));

// Variant 1 (https://docs.confluent.io/current/streams/faq.html#option-1-write-kstream-to-ak-read-back-as-ktable)
// Here, we re-key the KTable, write the results to a new topic, and then re-read that topic into a new KTable.
table
    .toStream()
    .map((key, value) -> KeyValue.pair(value, key))
    .to(outputTopic1, Produced.with(Serdes.String(), Serdes.Integer()));
KTable<String, Integer> rekeyedTable1 =
    builder.table(outputTopic1, Consumed.with(Serdes.String(), Serdes.Integer()));

// Variant 2 (https://docs.confluent.io/current/streams/faq.html#option-2-perform-a-dummy-aggregation)
// Here, we re-key the KTable (resulting in a KGroupedTable), and then perform a dummy aggregation to turn the
// KGroupedTable into a KTable.
final KTable<String, Integer> rekeyedTable2 =
    table
        .groupBy(
            (key, value) -> KeyValue.pair(value, key),
            Grouped.with(Serdes.String(), Serdes.Integer())
        )
        // Dummy aggregation
        .reduce(
            (aggValue, newValue) -> newValue, /* adder */
            (aggValue, oldValue) -> oldValue  /* subtractor */
        );
rekeyedTable2.toStream().to(outputTopic2, Produced.with(Serdes.String(), Serdes.Integer()));
 类似资料:
  • 目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。 最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇

  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里

  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 使用Kafka流DSL是否可行?所有正在使用的主题都是,因此我希望模拟一个表,并且永远不要摆脱旧的值。 TL;DR;如何将一条消息转换成多条消息?

  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我