我试图加入KStream与KTable。如果没有连接,我可以从中间主题“book属性-by-id”中阅读。
KTable的示例消息:
{key: {id: 1}
value: {id: 1, attribute_name: "weight"}}
KStream的示例消息:
{key: {id: 1},
value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}
“最终聚合”主题的所需输出:
{key: {id: 1},
value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1},
value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}
这是密码
KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));
bookAttributeStream
.selectKey((k, v) -> k.getId())
.to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));
KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));
// when the snippet below is commented out, consuming "book-attribute-by-id" works.
bookValueIntStream
.selectKey((k, v) -> v.getAttribute_id())
.join(bookAttributeByIdTable, (intValue, attribute) -> {
System.out.println("intValue: " + intValue);
System.out.println("attribute: " + attribute);
return new BookAttributeValue(intValue, attribute);
});
加入KStream时出现异常
线程“xxx-StreamThread-1”组织中出现异常。阿帕奇。Kafka。溪流。错误。TopologyBuilderException:无效的拓扑构建:未找到流线程[xxx-StreamThread-1]主题:组织上id的book属性。阿帕奇。Kafka。溪流。加工机内部。StreamPartitionAssignor$CopPartitionedTopicsValidator。验证(StreamPartitionAssignor.java:792)
我假设您使用的是kafka streams 1.0.0
问题是您必须为流创建输入主题。
在您的案例中,主题是:bookattributebyid
,以及变量值的主题:bookAttributeTopic
,bookValueIntTopic
。
对于连接Kafka Streams必须确保,连接主题中的分区数量相等。当它试图获取主题的元数据时,会引发异常:book属性-by-id
。
在运行应用程序之前,您必须手动创建book属性by id
主题
在较新版本的kafka streams中,在验证分区数之前检查主题是否存在。
我将一个KStream与一个KTable左联接,但我没有看到输出主题的任何输出: 如果我绕过连接,直接将输入主题输出到输出,我会看到消息到达。我已经将联接更改为左联接,添加了一些printlns以查看何时提取键(但控制台上没有打印任何内容)。而且我每次都使用Kafka流重置工具,所以从头开始。我的想法快用完了。此外,我还添加了一些对存储的测试访问,它可以工作,并且包含来自流的键(尽管这不应该因为左
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么
有没有人发帖回应这个问题?还有其他帖子没有答案。我们的情况是,在流流程的第一步中,我们将消息推送到支持KTable的主题上。然后我们从这些消息中提取少量数据并将其传递出去。我们正在对较小数量的数据进行多次计算,以便进行分组和聚合。在流式处理的最后,我们只想通过一个KTable连接回原来的主题,以便再次获取完整的消息内容。联接的结果只是数据的一个子集,因为它无法在KTable中找到条目。 这只是问题
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所
我正在尝试构建以下拓扑: > 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。 在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示: 请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从201