我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。
到目前为止,我有以下方法:
前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
KStream-KStream(在累计流和主题C之上)
你能建议一个更好的方法或任何例子,我可以看看类似的实现在Java。
可以使用两个连续的联接:
KStream streamAB = streamA.join(streamB, ...);
// either:
KStream streamABC = streamA.selectKey(...) // set to the key as in streamC
.join(streamC, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to the key as in streamAB
KStream streamABC = streamA.join(streamCnew, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to a new join key
KStream streamABC = streamA.selectKey(...) // set to a new join key
.join(streamC, ...);
streamABC.selectKey(/* extract grouping field and set as key */).to("outputTopic");
我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?
我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果: 在下面的服务中运行 getTopics() 方法时,我们会得到 配置: 服务: Kafka已经启动并运行,我们可以成功地从应用程序向主题发送消息。
这里是源连接器状态的输出: 这里是接收器连接器配置的输出: 这里是接收器连接器状态的输出:
我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不
我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R
我如何通过可能从application.yml中插入“+死信”来设置相同的Kafka主题?我试过这样一件事: 但它给我创造了两个同名的不同主题。我正在等待一些建议,谢谢你的帮助!