当前位置: 首页 > 知识库问答 >
问题:

Kafka streams-连接两个ktables会调用连接函数两次

羊舌新荣
2023-03-14

我试图加入2 KTables。

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(RecordBean.class),
    bidTopic, RECORDS_STORE);

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(ImpressionBean.class),
    impressionTopic, IMPRESSIONS_STORE);

KTable<String, RecordBean> mergedByTxId = recordsTable
    .join(impressionsTable, merge());

合并功能非常简单,我只是将值从一个bean复制到另一个bean。

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
  v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
  return v1;
};

但由于某些原因,join函数在单个生成的记录上调用了两次。请参阅下面的流媒体/制作人配置

Properties streamsConfiguration = new Properties();
streamsConfiguration
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
    .getAbsolutePath());

return streamsConfiguration;

生产者配置-

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return producerConfig;

接下来,我将提交每个流的单个记录。两个记录具有相同的密钥。我希望收到一条记录作为输出。

 IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
    Arrays.asList(new KeyValue("1", getRecordBean("1"))),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
    Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        outputTopic, 1);

但是ValueJoiner触发了2次,我得到了2条相同的输出记录,而不是一条。在触发时间内-两个流中的值都存在-我无法获取触发第二次执行的内容。

没有加入-我不能复制这种行为。我找不到任何2 ktable连接的工作示例-所以无法理解我的方法有什么问题。

添加演示相同行为的简单代码

KStreamBuilder builder = new KStreamBuilder();

KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");

KTable<String, String> joined = first.join(second, (value1, value2) -> value1);

joined.to("output");

KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());

streams.start();

IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
    Arrays.asList(new KeyValue("1", "first stream")),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
    Arrays.asList(new KeyValue("1", "second stream")),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        "output", 1);

共有2个答案

欧阳俊捷
2023-03-14

我在两个KTable之间使用leftJoin发现了同样的行为,并在谷歌搜索后偶然发现了这篇文章。我不知道您使用的是哪个版本的Kafka流,但是在调试了汇流代码后,Kafka流2.0.1版本似乎故意在某些类型的连接中发送旧的和新的值,所以您会收到两次对ValueJoiner的调用。

看看org的实现。阿帕奇。Kafka。溪流。kstream。内部。KTableImpl#buildJoin构建连接拓扑,以及组织。阿帕奇。Kafka。溪流。kstream。内部。KTableKTableRightJoin。KTableKTableRightJoinProcessor#进程,它在运行时调度它。在某些情况下,显然要做两次。

以下是这种行为的一些背景https://issues.apache.org/jira/browse/KAFKA-2984

郑胡媚
2023-03-14

我得到了以下解释后张贴类似的问题,以汇流邮件组。

我认为这可能与缓存有关。这两个表的缓存是独立刷新的,因此有可能两次获得相同的记录。如果stream1和stream2都收到同一密钥的记录,并且缓存刷新,则:

stream1中的缓存将刷新、执行连接并生成记录。

stream2中的缓存将刷新、执行连接并生成记录。

从技术上讲,这是可以的,因为连接的结果是另一个KTable,因此KTable中的值将是正确的值。

将以下变量设置为0后,StreamsConfig。缓存\最大\字节\缓冲\配置,0-问题已解决。我仍然得到了2条记录——但现在有一条记录是用null连接的——根据上面提供的连接语义文档,它的行为非常清楚。

 类似资料:
  • 问题内容: 有简单的解决方案,可通过串联两个或java 。由于是经常使用的。是否有任何简单的方法来连接两个? 这是我的想法: 它可以工作,但实际上可以转换为,然后再次转换回。 问题答案: 您可以使用协力让这件事没有任何自动装箱拆箱或完成。这是它的外观。 请注意,返回,然后将其与另一个串联,然后再收集到数组中。 这是输出。 [1、34、3、1、5]

  • 问题内容: 我有两个表,如下所示: 我想列出参加活动17的所有人(包括学生和教师)的名字。无论如何,我可以获得以下结果: 无需创建新表(仅使用表达式或派生关系的嵌套)? 在actid上加入JOIN会得到如下结果: 我想我需要一种串联形式? 问题答案: 您可能(或可能不需要)对ID不唯一的内容进行处理,例如

  • EasyReact 的重点就是让节点之间的数据流动起来,所以连接节点是很重要的。 如何连接两个节点 两个节点是通过变换来连接的,在源码目录 EasyReact/Classes/Core/NodeTransforms 中我们默认实现了了很多的变换,你也可以通过继承 EZRTransform 类来实现自己的变换,一旦我们创建好一个变换后,就可以通过如下方式进行连接了: EZRMutableNode<N

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 问题内容: 我正在JPanel中绘制两个形状(圆形),我需要用一条 线将它们连接起来。我这样做是通过简单地获得圆的中点并 相互连接来实现的。 问题在于,现在我需要制作单向线,该单向线 的末尾带有“箭头”,以指出线的前进方向。所以现在我不能 使用圆的中点,因为我需要从 边界到边界相互连接,因此“箭头”可以正确显示。 在我上一次尝试的结果是,没有任何好处: PS:在屏幕截图中,我并不是为了看到 直线的

  • 问题内容: 我有两个具有相同结构的JSON对象,我想使用Javascript将它们连接在一起。是否有捷径可寻? 问题答案: 根据注释中的描述,您只需执行一个数组concat: