当前位置: 首页 > 知识库问答 >
问题:

使用kafka流、KStream-GlobalKtable联接进行数据充实

邵兴文
2023-03-14
null: { "ID":"1", "name":"XDFER"}
null: { "ID":"1", "name":"TRAFD"}
XDFER : "john"
TRAFD : "albert"
null: { "ID":"1", "name":"john"}
null: { "ID":"1", "name":"albert"}

我开始阅读Kafka Stream应用程序,在每个教程/示例中,通过比较KStream和GlobalkTable中的键来丰富数据。在我的情况下,我需要将KStream记录的值中的一个项与GlobalKTable中的一个键进行比较。如何实现这一点的任何想法或例子。

共有1个答案

上官和惬
2023-03-14

具有null键或null值的流的输入记录将被忽略,并且不会触发联接。

因此,您需要重新确定流的密钥,以便可以使用name作为密钥。

stream.selectKey(v-> v.get("name"))

重新键入后,您就可以将stream与GlobalKtable连接起来。

 类似资料:
  • 我对Kafka的溪流很陌生。我想执行以下KStream-GlobalKTable纯基于DSL的左联接操作,而不使用map操作。 和另一个输入主题,它是 ,其中value: 我要执行左联接操作是一个流,主数据是一个全局表,以实现结果值为 连接条件为 代码:

  • 我试图用GlobalKTable连接KStream,连接不完全在键上。 我想通过empIdOverLoginUserId的值通过employeesDetails的键将empIdOverLoginUserId与employeesDetails连接

  • 我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st

  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 最近,我开始为即将到来的项目阅读Kafka流,偶然发现了一个概念,即如果我们想要加入两个流,就需要进行共分区。我所能理解的是,如果我们有两个主题A和B,它们都必须有相同数量的分区,对于键“X”,这两个主题的分区号也必须相同。 主题A带分区A0, A1, A2主题B带分区B0, B1, B2 然后,键为“X”的消息必须分别在A0和B0中发布。 问题:为什么两个主题的分区数必须相同(对于“X”键),以

  • 我有一个Kafka Streams应用程序,其中我将读取“topic1”的KStream与读取“topic2”的GlobalKTable连接起来,然后再与读取“topic3”的GlobalKTable连接起来。 当我尝试同时推送消息到所有3个主题时,我会得到以下异常- org.apache.kafka.streams.errors.invalidStateStoreException 如果我在这些