当前位置: 首页 > 知识库问答 >
问题:

Spring和Kafka:加入3个Kafka主题生成输出Kafka流

宁飞宇
2023-03-14

我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。

到目前为止,我有以下方法:

前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

KStream-KStream(在累计流和主题C之上)

你能建议一个更好的方法或任何例子,我可以看看类似的实现在Java。

共有1个答案

董俊晖
2023-03-14

可以使用两个连续的联接:

KStream streamAB = streamA.join(streamB, ...);
// either:
KStream streamABC = streamA.selectKey(...) // set to the key as in streamC
                           .join(streamC, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to the key as in streamAB
KStream streamABC = streamA.join(streamCnew, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to a new join key
KStream streamABC = streamA.selectKey(...) // set to a new join key
                           .join(streamC, ...);

streamABC.selectKey(/* extract grouping field and set as key */).to("outputTopic");
 类似资料:
  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果: 在下面的服务中运行 getTopics() 方法时,我们会得到 配置: 服务: Kafka已经启动并运行,我们可以成功地从应用程序向主题发送消息。

  • 这里是源连接器状态的输出: 这里是接收器连接器配置的输出: 这里是接收器连接器状态的输出:

  • 我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不

  • 我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R

  • 我如何通过可能从application.yml中插入“+死信”来设置相同的Kafka主题?我试过这样一件事: 但它给我创造了两个同名的不同主题。我正在等待一些建议,谢谢你的帮助!