当前位置: 首页 > 知识库问答 >
问题:

两个Kafka流Ktable连接操作两次发出消息

秦彦君
2023-03-14

我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner。

让我知道如何解决这个问题,以便只有一条消息作为加入操作的输出发出。

KTable<ID, Message> joinedMsg = msg1.join(msg2, new MsgJoiner());

由于两个ktable(msg1和msg2)之间的连接,我收到两条相同的消息。

共有1个答案

洪伟兆
2023-03-14

启用缓存时通常会注意到这种行为。

如果两个表中的相同键都有更新,则每个表将独立刷新,因此每个表将触发连接,因此相同键将得到两个结果。

i、 e.有两个表:表1和表2。以下是表1和表2中接收到的输入数据:

table1 A:1
table2 A:A

在提交间隔刷新存储时。它刷新表1的存储,触发联接并生成A:1:A。然后它将刷新表2,触发联接并生成A:1:A

您可以通过设置缓存来尝试禁用缓存。最大字节数。缓冲=0

另外,KTable/KTable联接中已经有一个未解决的问题。

 类似资料:
  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 我试图加入2 KTables。 合并功能非常简单,我只是将值从一个bean复制到另一个bean。 但由于某些原因,join函数在单个生成的记录上调用了两次。请参阅下面的流媒体/制作人配置 生产者配置- 接下来,我将提交每个流的单个记录。两个记录具有相同的密钥。我希望收到一条记录作为输出。 但是ValueJoiner触发了2次,我得到了2条相同的输出记录,而不是一条。在触发时间内-两个流中的值都存在

  • 我正在尝试使用Apache Flink流API加入两个流,但没有任何内容加入,并且在阅读文档后我不知道我做错了什么 关键功能是

  • 问题内容: 2个流: 给定可读流, 并且 获取包含 并 连接 流的惯用(简洁)方法是什么? 我不能做,因为这样流内容混杂在一起。 n个 流: 给定一个EventEmitter发出不确定数量的流,例如 一种将 所有流串联在一起的流 的惯用(简洁)方法是什么? 问题答案: 该合并的流包会连接流。自述文件中的示例: 我相信您必须立即添加所有流。如果队列为空,则自动结束。参见问题5。 该流流库是一个具有明