我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner。
让我知道如何解决这个问题,以便只有一条消息作为加入操作的输出发出。
KTable<ID, Message> joinedMsg = msg1.join(msg2, new MsgJoiner());
由于两个ktable(msg1和msg2)之间的连接,我收到两条相同的消息。
启用缓存时通常会注意到这种行为。
如果两个表中的相同键都有更新,则每个表将独立刷新,因此每个表将触发连接,因此相同键将得到两个结果。
i、 e.有两个表:表1和表2。以下是表1和表2中接收到的输入数据:
table1 A:1
table2 A:A
在提交间隔刷新存储时。它刷新表1的存储,触发联接并生成A:1:A
。然后它将刷新表2,触发联接并生成A:1:A
您可以通过设置缓存来尝试禁用缓存。最大字节数。缓冲=0
。
另外,KTable/KTable联接中已经有一个未解决的问题。
我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我
我试图加入2 KTables。 合并功能非常简单,我只是将值从一个bean复制到另一个bean。 但由于某些原因,join函数在单个生成的记录上调用了两次。请参阅下面的流媒体/制作人配置 生产者配置- 接下来,我将提交每个流的单个记录。两个记录具有相同的密钥。我希望收到一条记录作为输出。 但是ValueJoiner触发了2次,我得到了2条相同的输出记录,而不是一条。在触发时间内-两个流中的值都存在
我正在尝试使用Apache Flink流API加入两个流,但没有任何内容加入,并且在阅读文档后我不知道我做错了什么 关键功能是
问题内容: 2个流: 给定可读流, 并且 获取包含 并 连接 流的惯用(简洁)方法是什么? 我不能做,因为这样流内容混杂在一起。 n个 流: 给定一个EventEmitter发出不确定数量的流,例如 一种将 所有流串联在一起的流 的惯用(简洁)方法是什么? 问题答案: 该合并的流包会连接流。自述文件中的示例: 我相信您必须立即添加所有流。如果队列为空,则自动结束。参见问题5。 该流流库是一个具有明
我有一个
Java8 API说: