当前位置: 首页 > 知识库问答 >
问题:

kafka流到ktable连接

匡安宜
2023-03-14

我想加入一个

  • kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。{“value”:“0”,“time”:1.540753118800291 e9,,“deviceIP”:“111.111.111.111”,“deviceName”:“kyz1”,“indicatorName”:“ifhcinoctets”}

键映射为:

map((key,value)->keyvalue.pair(value.deviceName+value.indicatorName,value))

我查看了KStream并打印了键和我使用的属性。看起来都很好。

    null

现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用,

  KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

我有完全相同的东西通过ksql工作,但想做我自己的流应用程序。

共有1个答案

戴原
2023-03-14

现在,错误听起来太愚蠢了,我的PoJo类只有很少的属性是static:-(,导致了错误的键。

 类似资料:
  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 我能做到写作和阅读的中间主题: 有没有简单的方法从中获取?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。

  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner。 让我知道如何解决这个问题,以便只有一条消息作为加入操作的输出发出。 由于两个ktable(msg1和msg2)之间的连接,我收到两条相同的消息。

  • 我正在尝试通过键连接两个(无窗口)并将结果写入