当前位置: 首页 > 知识库问答 >
问题:

用于添加全局存储的Kafka streams用例

古弘
2023-03-14

在kafka流中定义拓扑时,可以添加全局状态存储。它需要一个源主题以及processorsupplier。处理器接收记录,并在将其添加到存储区之前对其进行理论上的转换。但是在恢复的情况下,记录直接从源主题(changelog)插入到全局状态存储中,跳过在处理器中完成的最终转换。

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#AddGlobalStore(StoreBuilder StoreBuilder,String topic,Consumented Consuments,ProcessorSupplier stateUpdateSupplier)将全局StateStore添加到拓扑中。

根据文件

同时,由于kafka bug tracker上的主要bug目前已经打开:当从主题还原状态时,没有使用addGlobalStore上提供的KAFKA-7663自定义处理器,该主题准确地解释了文档中所述的内容,但似乎是一个公认的bug。

我想知道Kafka-7663是否确实是一个bug或不是。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。
有人能解释一下这个低级API的主要用例吗?我唯一能想到的就是处理副作用,例如,在处理器中执行一些日志操作。

额外的问题:如果源主题充当全局存储的changelog,当一条记录因为保留时间过期而从主题中删除时,它会从全局状态存储中删除吗?或者只有在从Changelog恢复完整存储后才会在存储中删除。

共有1个答案

公羊凌
2023-03-14

是的,这是一个很奇怪的小陷阱,但文档是正确的。全局状态存储的处理器不能对记录执行任何操作,而是要将它们保存到存储中。

阿法克,这不是哲学问题,只是一个实际问题。原因只是你观察到的行为...Streams将输入主题视为存储区的changelog主题,因此在还原期间绕过处理器(以及反序列化)。

状态还原绕过任何处理的原因是,changelog中的数据通常与存储区中的数据相同,因此对其执行任何新操作实际上都是错误的。另外,将字节从线上取下来并将其大容量写入状态存储更有效。我之所以说“通常”,是因为在本例中,input主题与普通的changelog主题并不完全相同,因为它在存储放入期间不会接收写入。

    null

顺便说一句,如果您想要后一种行为,您现在可以通过应用转换并使用to(my-global-changelog)来制造一个“changelog”主题来近似它。然后,创建从my-global-changelog而不是输入读取全局存储区。

所以,给你一个直接的答案,Kafka-7663不是一个bug。我将对提议将其转化为特性请求的票进行评论。

额外的回答:作为状态存储的changelogs的主题不能被配置为保留。实际上,这意味着您应该通过启用压缩来防止无限增长,并禁用日志保留。

在实践中,旧数据失去保留并被删除不是一个“事件”,消费者无法知道它是否/何时发生。因此,不可能从状态存储中删除数据以响应此非事件。就像你描述的那样...这些唱片将无限期地放在全球商店里。如果/当一个实例被替换时,新的实例将从输入恢复,并且(显然)只接收当时主题中存在的记录。因此,流群集作为一个整体将最终导致全局状态的不一致视图。这就是为什么您应该禁用保留。

从存储中“删除”旧数据的正确方法是在输入主题中为所需的键写一个墓碑。然后,这将被正确地传播到集群的所有成员,在恢复期间被正确地应用,并被代理正确地压缩。

我希望这一切都有帮助。当然,请在票子上编钟,并帮助我们塑造API更直观!

 类似资料:
  • 问题内容: 在kafka流中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个。处理器接收记录,并可以在理论上对其进行转换,然后再将其添加到存储中。但是,在还原的情况下,记录会直接从源主题(更改日志)插入到全局状态存储中,从而跳过了最终在处理器中完成的转换。 StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder,字符串主题,已消耗

  • 问题内容: 我正在使用“使用严格”的函数形式,并且不希望Babel在转换后添加全局形式。问题是我正在使用一些未使用“严格使用”模式的库,并且在连接脚本后可能会引发错误 问题答案: Babel5 你会列入黑名单。例如,这是Gruntfile中的一个示例: Babel6 由于Babel 6现在已完全选择加入插件,而不是将其列入黑名单,因此您无需添加插件。如果您正在使用包含它的预设,那么我认为您必须创建

  • 对于我们的应用程序,我们需要能够提供团体访问文件。每个用户可能拥有大量的组,因此使用“自定义令牌”解决方案是没有意义的(无论如何,这是非常尴尬的)。 我发现,Firebase的存储安全规则非常有限。主要问题是,我们将组定义保留在存储安全规则无法访问的Firestore中。 为了克服这个问题,我们决定在每个上传文件的元数据中包含一个“令牌”,组中的任何人都可以访问这个令牌。当他们下载一个文件时,他们

  • 本文向大家介绍gradle 添加存储库,包括了gradle 添加存储库的使用技巧和注意事项,需要的朋友参考一下 例子 您必须将Gradle指向插件的位置,以便Gradle可以找到它们。为此添加一个repositories { ... }到您的build.gradle。 这是添加三个存储库(JCenter,Maven存储库和提供Maven样式的依赖关系的自定义存储库)的示例。            

  • 更新:在联系了Firebase的支持后,他们告诉我,在修复了他们的后端之后,这个问题确实被修复了。

  • 有没有办法添加Firebase 3存储安全规则来限制单个认证用户可以上传多少个文件?例如每个用户100个文件。 或者更新Firebase数据库文件数,一旦有人将文件上传到存储器,然后验证文件数。 试图解决问题,如何处理用户上传无限量数据到存储器的能力。