KTable<Key1, GenericRecord> primaryTable = createKTable(key1, kstream, statestore-name);
KTable<Key2, GenericRecord> childTable1 = createKTable(key1, kstream, statestore-name);
KTable<Key3, GenericRecord> childTable2 = createKTable(key1, kstream, statestore-name);
primaryTable.leftJoin(childTable1, (primary, choild1) -> compositeObject)
.leftJoin(childTable2,(compositeObject, child2) -> compositeObject, Materialized.as("compositeobject-statestore"))
.toStream().to(""composite-topics)
对于我的应用程序,我使用KTable-Ktable连接,这样无论何时在主数据流或子数据流上接收数据,它都可以为所有三个表设置带有setters和getters的复合对象。这三个传入流具有不同的键,但是在创建KTable时,我为所有三个KTable设置相同的键。
我有一个分区的所有主题。当我在单个实例上运行应用程序时,一切都运行良好。我可以看到compositeObject填充了所有三个表中的数据。通过recordID和本地statestore名称,所有交互式查询也可以正常运行。
但当我运行同一应用程序的两个实例时,我看到compositeObject包含primary和child1数据,但child2仍然为空。即使我尝试使用交互式查询调用statestore,它也不会返回任何信息。
我正在使用spring cloud stream kafka streams库编写代码。
请说明不设置的原因,以及正确的解决方案。
Kafka Streams的缩放模型与进审量主题分区相耦合。因此,如果您的输入主题是单个分区,您就不能向外扩展。进审量主题分区决定了您的最大并行度。
因此,您需要创建具有更高并行度的新主题。
我找到的大部分信息都与主键连接有关。我知道外键连接对于Kafka流来说是一个相对较新的功能。我对它的规模感兴趣。我知道Kafka Streams并行性受到每个主题上的分区数的限制,但是我对增加输入主题分区意味着什么有一些疑问。 外键联接对共分区输入主题有相同的要求吗?也就是说,两个主题是否需要具有相同数量的分区 在应用程序在生产环境中运行数月或数年后,如何添加分区?支持每个KTable的chang
我有一个Kafka流应用程序,有两个数据源:事件和用户。 我有4个主题:事件,用户,用户2,用户事件 Users2与Users相同,用于演示GlobalKTable。 Events主题使用时间戳模式,因此当到达时间戳字段日期时,KStream将接收事件记录。 此时,我想为用户KTable中的每个用户ID以及新的事件ID创建一个用户事件记录;但我不知道如何遍历GlobalKTable或KTable来
问题内容: 我希望我的应用程序检查自身的另一个版本是否已在运行。 例如,启动后,用户单击以再次运行它,但是第二个实例意识到“哦,等等,已经在运行中”。并退出并显示一条消息。 问题答案: 您所寻找的内容可能最好用锁定文件来完成。锁定文件仅是指具有预定义位置且存在您的互斥体的文件。 测试程序启动时该文件是否存在,如果存在,请立即退出。在已知位置创建文件。如果程序正常退出,请删除锁定文件。 最好的办法是
我怎么能做到..以编程方式有什么方法可以做到这一点。 Pls救命!
请参阅下面的更新以显示潜在的解决方案 我们的应用程序使用2个主题作为KTables,执行左连接,并输出到一个主题。在测试过程中,我们发现当我们的输出主题只有一个分区时,这项功能可以正常工作。当我们增加分区的数量时,我们注意到生成到输出主题的消息数量减少了。 在启动应用程序之前,我们用多个分区配置测试了这一理论。使用1个分区,我们可以看到100%的消息。使用2,我们可以看到一些消息(少于50%)。对
这是我的节点文件 我在上有HTML 和上的节点应用程序 我也转发了3000端口到80; 我想在这个服务器上的一个不同的端口上运行多个应用程序。 我应该为其他应用程序配置什么。