有没有人发帖回应这个问题?还有其他帖子没有答案。我们的情况是,在流流程的第一步中,我们将消息推送到支持KTable的主题上。然后我们从这些消息中提取少量数据并将其传递出去。我们正在对较小数量的数据进行多次计算,以便进行分组和聚合。在流式处理的最后,我们只想通过一个KTable连接回原来的主题,以便再次获取完整的消息内容。联接的结果只是数据的一个子集,因为它无法在KTable中找到条目。
这只是问题的开始。在另一种情况下,我们使用KTables作为查找的索引,目的是丰富进入的数据。把这些查找看作是识别我们以前是否在流消息中看到过特定的模式。如果我们已经看到了模式,我们希望用一个从现有KTable中提取的ID(用于分组)来标记它。如果我们之前没有看到这个模式,我们将为它分配一个ID,并将它放回KTable中,用于标记未来的消息。我们发现的是,不能保证KTable中的信息会出现在将来的消息中。这种缺乏保证的情况似乎使KTables变得毫无用处。我们不明白为什么论坛上很少讨论这个问题。
最后,在使用streams应用程序的单个实例运行时,这些似乎都不是问题。然而,当我们的数据变大,我们被迫拥有10个应用程序实例时,一切都崩溃了。同样,我们也不可能使用GlobalKTables这样的东西,因为有太多的数据要加载到一台机器的内存中。
我们能做什么?我们目前正计划放弃所有KTable,而使用类似Hazelcast的东西来存储查找数据。我们是不是应该搬到Hazelcast Jet然后把Kafka streams一起来?
新增流:Kafka数据流
很抱歉这个不回答的回答,但是我没有足够的点来评论...
你描述的行为肯定与我对流的理解和经验不一致。如果您可以共享导致问题的拓扑(或简化的拓扑),那么可能有一个简单的错误,我们可以指出。
一旦我们得到更多的信息,我就可以把它编辑成一个“真实”的答案...
谢谢!--约翰
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我目前正在尝试使用KStream到KTable的连接来执行Kafka主题的充实。对于我的概念证明,我目前有一个Kafka流,其中有大约600,000条记录,它们都有相同的键,还有一个KTable,它是从一个主题创建的,其中KTable主题中的键与创建KStream的主题中的600,000条记录中的键匹配。 当我使用左联接(通过下面的代码)时,所有记录在ValueJoiner上都返回NULL。 下面
我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么
我试图加入KStream与KTable。如果没有连接,我可以从中间主题“book属性-by-id”中阅读。 KTable的示例消息: KStream的示例消息: “最终聚合”主题的所需输出: 这是密码 加入KStream时出现异常 线程“xxx-StreamThread-1”组织中出现异常。阿帕奇。Kafka。溪流。错误。TopologyBuilderException:无效的拓扑构建:未找到流线
我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我
我正在尝试通过键连接两个(无窗口)并将结果写入