在kafka流中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个ProcessorSupplier
。处理器接收记录,并可以在理论上对其进行转换,然后再将其添加到存储中。但是,在还原的情况下,记录会直接从源主题(更改日志)插入到全局状态存储中,从而跳过了最终在处理器中完成的转换。
+-------------+ +-------------+ +---------------+
| | | | | global |
|source topic -------------> processor +--------------> state |
|(changelog) | | | | store |
+-------------+ +-------------+ +---------------+
| ^
| |
+---------------------------------------------------------+
record directly inserted during restoration
StreamsBuilder#addGlobalStore(StoreBuilder
storeBuilder,字符串主题,已消耗的消耗量,ProcessorSupplier
stateUpdateSupplier)
将全局StateStore添加到拓扑。
根据文档
注意:您不应使用处理器 将转换后的记录插入全局状态存储 。该存储使用源主题作为changelog,并且在还原期间将 直接从源中
插入记录。应该使用此ProcessorNode来保持StateStore的最新状态。
同时,由于当前在kafka错误跟踪器上打开了主要错误:从主题还原状态时,将不会使用addGlobalStore上提供的KAFKA-7663自定义处理器,这完全可以解释文档中的内容,但似乎是可以接受的错误。
我想知道KAFKA-7663是否确实是个错误。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。
有人可以解释这个底层API的主要用例吗?我唯一能想到的就是处理副作用,例如在处理器中执行一些日志操作。
额外的问题:如果源主题充当全局存储的变更日志,则由于保留已过期而从该主题删除记录时,是否会将其从全局状态存储中删除?还是仅在从变更日志中对整个商店进行还原之后,才会在商店中进行删除。
是的,这是一个很奇怪的小问题22,但是文档是正确的。全局状态存储的处理器不得对记录做任何事情,而应将其持久存储到存储中。
AFAIK,这不是一个哲学问题,只是一个实际问题。原因很简单,就是您观察到的行为…
Streams将输入主题视为存储的changelog主题,因此在还原过程中会绕过处理器(以及反序列化)。
状态恢复绕过任何处理的原因是 通常
变更日志中的数据与存储中的数据相同,因此对它进行任何新操作实际上都是错误的。另外,仅将字节从线中删除并将它们批量写入状态存储区就更有效。我之所以说“通常”,是因为在这种情况下,输入主题与普通的changelog主题并不完全一样,因为在存储放置期间它不会接收其写入。
对于它的价值,我也很难理解用例。看来,我们应该:
顺便说一句,如果您想要后一种行为,则可以立即应用转换,然后使用to(my-global-changelog)
来制造“
变更日志”主题,以对其进行近似。然后,您将创建全局存储以从您my-global-changelog
的输入而不是输入中读取。
因此,给您一个直接的答案,KAFKA-7663不是错误。我将对建议将票证转换为功能请求的票证进行评论。
最佳答案: 不能 为保留状态配置充当状态存储的更改日志的主题。实际上,这意味着您应该通过启用压缩来防止无限增长,并禁用日志保留。
实际上,旧数据因保留而丢失和丢失不是“事件”,消费者无法知道是否/何时发生。因此,无法响应此非事件而从状态存储中删除数据。就像您描述的那样,它将发生……记录将无限期地存在于全局存储中。如果/当替换实例时,新实例将从输入中恢复,并且(显然)仅接收当时该主题中存在的记录。因此,Streams集群作为一个整体将在全局状态不一致的情况下结束。这就是为什么您应该禁用保留。
从存储中“删除”旧数据的正确方法是将所需键的逻辑删除写入输入主题。然后,它将被正确传播到群集的所有成员,在还原期间正确应用,并由代理正确压缩。
希望对您有帮助。肯定的,请输入票证并帮助我们使API更加直观!
在kafka流中定义拓扑时,可以添加全局状态存储。它需要一个源主题以及。处理器接收记录,并在将其添加到存储区之前对其进行理论上的转换。但是在恢复的情况下,记录直接从源主题(changelog)插入到全局状态存储中,跳过在处理器中完成的最终转换。 StreamsBuilder#AddGlobalStore(StoreBuilder StoreBuilder,String topic,Consumen
问题内容: 我正在使用“使用严格”的函数形式,并且不希望Babel在转换后添加全局形式。问题是我正在使用一些未使用“严格使用”模式的库,并且在连接脚本后可能会引发错误 问题答案: Babel5 你会列入黑名单。例如,这是Gruntfile中的一个示例: Babel6 由于Babel 6现在已完全选择加入插件,而不是将其列入黑名单,因此您无需添加插件。如果您正在使用包含它的预设,那么我认为您必须创建
我使用springdoc openapi for java SpringBoot RESTful应用程序定义了以下: 是否可以将其全局应用于所有路径,而不必在代码中的任何地方添加注释到注释? 如果是,如何添加排除到不安全的路径?
我使用的是Confluent Community 6.0.1。三个节点Kafka集群: devKafka04:Kafka Broker1、Zookeeper 1 开发Kafka05: Kafka经纪人2, 动物园管理员 2 devKafka06:Kafka经纪人3,动物园管理员3 SSL加密已经在Kafka经纪人上运行良好。 我想添加SASL以启用Kafka和Zookeeper之间的相互身份验证。
问题内容: 我想创建一个在所有符合此条件的情况下强制执行某种情况的方法。 例如,如果我有一个这样的: 我想用扩展它,增加另一种情况: 这可能吗? 问题答案: 解决方法是使用带有变量的a。 注意:这是在 Swift 3 中完成的 以下是 Swift 3* 的实现 * 结构: 协议 调用中 注意: 请注意,该方法不能替代,仅在编译时不知道值时才使用此方法。
问题内容: 如何在代码后面创建可观察对象并生成下一个值?由于其他异步事件,我希望能够从代码的不同部分调用onNext。 这是我尝试过的,不起作用: 问题答案: 您需要的是某种主题。,,,等。 创建一个主题,然后可以订阅它。您也可以添加到流中。 例如: 一个更具体的用例(每次成功返回HTTP响应时,都会添加到可观察的流中): 正如另一位用户所说,如果您使用的是V5 ,请确保将所有设置都更改为。如果您