当前位置: 首页 > 知识库问答 >
问题:

如何使用JAVA对Kafka流中的AVRO格式化数据执行连接操作

太叔凌龙
2023-03-14

第1流:

 [KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}

流-2:

[KSTREAM-SOURCE-0000000002]: null, {"id": 1, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 2, "name": "e", "age": 78}
[KSTREAM-SOURCE-0000000002]: null, {"id": 12, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 21, "name": "e", "age": 78}

现在,我想对这两个流执行JOIN操作,并希望仅检索流-1中不存在于流-2中的行。我的输入流数据是AVRO格式

预期产出:

 [KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
 [KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}

那么,我应该执行哪个连接操作,以及如何实现预期的输出呢?谁能帮我实现这个目标

共有1个答案

戚阳
2023-03-14

如果您查看这里的文档:kafka streams连接语义,您可能会使用左连接,并在设置stream2中的值时在值连接器中返回null。

一些伪代码:

stream1.leftJoin(stream2, valueJoiner);

valueJoiner = (s1, s2) -> {if (s2 != null) {
      return false
   } else {
     return true;
   } 
};

免责声明:我没有测试过这个。

 类似资料:
  • 我正在尝试计算Flink中输入数据流(无窗口)的平均值 输入数据流来自套接字连接,形式为“键值”,如“x 5”

  • ...这不起作用,因为我在管道执行之前调用get()。到目前为止,我还没有将为do_some_stuff函数所做的调整到“read”行 任何关于如何进行的建议或解决方案都将不胜感激。谢了!

  • > 简单说明一下我想实现的目标:我想对一个kafka流拓扑(使用拓扑测试驱动程序)进行功能测试,用于avro记录。 问题:无法“模拟”模式注册表尝试自动发布/读取模式 到目前为止,我尝试使用MockSchemaRegistryClient来模拟schemaRegistry,但我不知道如何将其链接到Avro Serde。 代码 阿夫罗斯德法 如果我运行测试没有它工作得很好(看起来一切都解决了) 但是

  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问

  • 我正在构建一个电子商务应用程序,我目前正在处理两个数据馈送:订单执行和销售中断。由于各种原因,销售失败将是无效的执行。失败的销售将具有与订单相同的订单编号,因此连接位于订单编号和行项目编号上。 目前,我有两个主题-,和。两者都是使用Avro模式定义的,并使用SpecificRecord构建的。键是。 订单的字段:订单编号,时间戳,订单行,项目编号,数量 的字段: 通过运行 我需要将与左连接,并在输

  • 我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用 下面是我的application.yml文件