第1流:
[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
[KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
[KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
[KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
[KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
[KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
[KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
[KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
[KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}
流-2:
[KSTREAM-SOURCE-0000000002]: null, {"id": 1, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 2, "name": "e", "age": 78}
[KSTREAM-SOURCE-0000000002]: null, {"id": 12, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 21, "name": "e", "age": 78}
现在,我想对这两个流执行JOIN操作,并希望仅检索流-1中不存在于流-2中的行。我的输入流数据是AVRO格式
预期产出:
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
[KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
[KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
[KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
[KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
[KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
[KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
[KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}
那么,我应该执行哪个连接操作,以及如何实现预期的输出呢?谁能帮我实现这个目标
如果您查看这里的文档:kafka streams连接语义,您可能会使用左连接,并在设置stream2中的值时在值连接器中返回null。
一些伪代码:
stream1.leftJoin(stream2, valueJoiner);
valueJoiner = (s1, s2) -> {if (s2 != null) {
return false
} else {
return true;
}
};
免责声明:我没有测试过这个。
我正在尝试计算Flink中输入数据流(无窗口)的平均值 输入数据流来自套接字连接,形式为“键值”,如“x 5”
...这不起作用,因为我在管道执行之前调用get()。到目前为止,我还没有将为do_some_stuff函数所做的调整到“read”行 任何关于如何进行的建议或解决方案都将不胜感激。谢了!
> 简单说明一下我想实现的目标:我想对一个kafka流拓扑(使用拓扑测试驱动程序)进行功能测试,用于avro记录。 问题:无法“模拟”模式注册表尝试自动发布/读取模式 到目前为止,我尝试使用MockSchemaRegistryClient来模拟schemaRegistry,但我不知道如何将其链接到Avro Serde。 代码 阿夫罗斯德法 如果我运行测试没有它工作得很好(看起来一切都解决了) 但是
我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问
我正在构建一个电子商务应用程序,我目前正在处理两个数据馈送:订单执行和销售中断。由于各种原因,销售失败将是无效的执行。失败的销售将具有与订单相同的订单编号,因此连接位于订单编号和行项目编号上。 目前,我有两个主题-,和。两者都是使用Avro模式定义的,并使用SpecificRecord构建的。键是。 订单的字段:订单编号,时间戳,订单行,项目编号,数量 的字段: 通过运行 我需要将与左连接,并在输
我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用 下面是我的application.yml文件