当前位置: 首页 > 知识库问答 >
问题:

Kafka流将KTable值映射为单独的值

须新
2023-03-14
('a', '123') -> ('a', '1,2,3') -> ('a1', '1'), ('a2', '2'), ('a3', '3')

使用Kafka流DSL是否可行?所有正在使用的主题都是compact,因此我希望模拟一个表,并且永远不要摆脱旧的值。

TL;DR;如何将一条消息转换成多条消息?

共有1个答案

谭宏盛
2023-03-14

不确定是否可以用DSL来表达。但使用处理器API实现这一点相当简单:

builder.stream("input-topic").transform(...).to("output-topic");

将键值存储附加到transform()中,并对每个输入记录执行以下操作:

  • 检查存储区中是否有对应的键值对
    • 如果是(即store.get()!=null),则从存储中取出旧值并将其拆分;将每个“拆分记录”的值替换为null并发出所有这些记录

 类似资料:
  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。 最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇

  • 我有一个用例,我的KTable是这样的。 KTable:orderTable 键:值 KTable:此表将位于groupBy值上,且计数列值将具有和 键:值

  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 我试图收集一个列表的结果,并将它们组织成一个地图,其中的值是一个地图: 我得到错误,因为对于列表中的不同值是相同的。 映射中包含的值应为: 当我尝试会删除其中一个条目