我们目前正在实现一个过程(使用Kafka处理器API),我们需要将来自一个主题的两个相关事件(消息)的信息合并,然后转发这些合并的信息。事件源于物联网设备,由于我们希望保持其有序,因此源主题使用设备标识符作为键。事件还包含相关ID:
钥匙
{ deviceId: "..." }
留言
{ deviceId: "...", correlationId: "...", data: ...}
我们的第一种方法是创建一个具有连接状态存储的处理器,该存储存储每条传入的消息,使用相关ID作为键。这使我们能够查询存储以获取传入消息的相关ID,如果存储中已经存在具有相同ID的消息,我们可以组合信息,转发一个新事件并从存储中删除条目。因此,对于每个相关ID,都会发生以下情况:在某个时间点,具有该ID的第一条消息被消费和存储,而在其他时间点,具有该ID的第二条消息导致存储条目被删除。
国家密钥
{ correlationId: "..." }
状态值
{ event: { deviceId: "...", correlationId: "...", data: ... }}
但是现在我们想知道Kafka Streams是如何处理不同的键的。我们使用的是微服务方法,并且会有该服务的多个实例在运行。存储自动由内部主题支持。考虑重新缩放服务实例,s. t.源主题和状态主题的分区重新平衡。是否可能将特定相关ID的分区分配给另一个服务,而不是相应设备ID的分区?如果具有相同相关ID的第二个事件将被服务实例使用,则我们最终会遇到这样的情况吗?该服务实例无权访问已存储的第一个事件?
提前感谢!
如果我正确理解您的设置,那么是的,该方法是正确的,(重新)缩放将正常工作。
太长别读:如果流任务从机器A移动到机器B,那么它的所有状态也将被移动,而不管该状态是如何键控的(在您的情况下,它恰好是由coryn Id
键控的)。
更详细地说:
deviceId
键控)以确定性方式将输入分区映射到流任务来实现的。这确保了即使流任务跨机器/VM/容器移动,它们也会始终看到“它们的”输入分区=它们的输入数据。coryn Id
键控)。对于您的问题来说,重要的是状态是如何键控的并不重要。重要的是输入分区是如何键控的,因为这决定了哪些数据从输入主题流向特定的流任务(参见前面的要点)。当流任务跨机器/VM/容器移动时,它的所有状态也将被移动,以便它始终有“自己的”状态可用。商店由内部主题自动支持。
正如您所建议的,存储有一个内部主题(用于容错和弹性伸缩,因为当其流任务从a移动到B时,该内部主题用于重建状态存储)是一个实现细节。作为使用Kafka Streams API的开发人员,状态存储恢复的处理是自动且透明的。
当移动流任务及其状态存储时,Kafka Streams知道需要如何在流任务的新位置重建状态存储。你不需要担心。
是否可能将特定相关ID的分区分配给另一个服务,而不是相应设备ID的分区?
没有(这很好)。流任务总是知道如何重构其状态(1个状态存储),而不管该状态本身是如何设置键控的。
如果具有相同关联ID的第二个事件将被一个服务实例消耗,而该服务实例无法访问已经存储的第一个事件,那么我们是否会最终陷入这种情况?
没有(这很好)。
我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者
null 谢谢你的澄清。
在阅读了以下内容之后:JWT:什么是好的密钥,以及如何将其存储在node.js/express应用程序中?关于如何存储“密钥”以分配JWT令牌。我有安全问题。我的数据(消息,用户名等...)将被加密(在数据库中),只有授权用户才能解密(基于他们的私钥)。由于JWT令牌是使用存储在服务器上的1个“秘密密钥”生成的,所以如果攻击者获得了“秘密密钥”并获得了数据库的控制权,则可以伪造令牌,因此可以绕过“
我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将
全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。
我有几个用Java实现的Kafka消费者,我正在实现一个独立的应用程序来检查记录并删除它们。希望Kafka在压缩主题时删除状态存储。 现在...我对Kafka创建的不同类型的商店有点困惑。对于每一种类型的店铺,我想知道: Kafka删除相应主题中的旧唱片时是否删除? 删除相应主题中的记录时是否删除? 我们是不是被困住了? 我看到的商店类型有以下几种: null