当前位置: 首页 > 知识库问答 >
问题:

Kafka流处理器API清除状态存储

苗冯浩
2023-03-14

我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。

KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("storeName");
StoreBuilder<KeyValueStore<String, StoreObject>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier,
            Serdes.String(), storeObjectSerde);   
topology.addSource("SourceReadername", stringDeserializer, sourceSerde.deserializer(), "sourceTopic")
.addProcessor("processor", () -> new CustomProcessor("store"), FillReadername)
.addStateStore(storeBuilder, "processor") // define store for processor
.addSink("sinkName", "outputTopic", stringSerializer, resultSerde.serializer(),
                    Fill_PROCESSOR);

我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者可能将DSL与处理器API混合使用,以便能够将两者结合起来,并将任何主题中的事件发送到Process方法,以便在清理主题中接收到事件时运行清理代码?

谢谢

共有1个答案

温亮
2023-03-14

您只需要添加另一个输入主题(add:source)并添加从该主题转换消息的处理器,然后根据它们从状态存储中删除staff。请注意,这些主题应该使用相同的键(因为分区)。

 类似资料:
  • 我们目前正在实现一个过程(使用Kafka处理器API),我们需要将来自一个主题的两个相关事件(消息)的信息合并,然后转发这些合并的信息。事件源于物联网设备,由于我们希望保持其有序,因此源主题使用设备标识符作为键。事件还包含相关ID: 钥匙 留言 我们的第一种方法是创建一个具有连接状态存储的处理器,该存储存储每条传入的消息,使用相关ID作为键。这使我们能够查询存储以获取传入消息的相关ID,如果存储中

  • 我正在尝试理解。 据我所知,在这种类型的流处理器中,它使用来维护某种状态。 我开始知道,实现的方法之一是使用。假设以下(并且只有一个处理器是) A- 假设sp只监听一个Kafka主题,比如带有10个分区的。 我观察到,当应用程序启动时(在不同的物理机器上有2个实例,并且=5),然后对于,它会创建目录结构,其内容如下: 0_0,0_1,0_2......0_9(每台机器有5个分区)。 我正在浏览一些

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成 现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。 编辑: 当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记

  • 问题内容: 我正在尝试清除组件,但找不到es6语法的参考。我正在使用: 但是,这不适用于es6类语法。 我将如何获得相同的结果? 问题答案: 据我所知,React组件不会保留初始状态的副本,因此您只需要自己做即可。 请注意,该行要求您永远不要改变状态,否则您将污染并且无法进行重置。如果无法避免突变,则需要在构造函数中创建的副本。(或按照进行功能。) 最后,建议您使用而不是。