当前位置: 首页 > 知识库问答 >
问题:

Kafka流和写入状态存储

锺高翰
2023-03-14

我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要:

  1. 使用可在以后检索的连续消息流。
  2. 保留与某些条件匹配的邮件 ID 列表。
  3. 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。
  4. 从列表中删除已处理的消息 ID,以便不重复工作。

我已考虑如下实施:

    < li >将传入的消息流作为实体化的KTable使用,以便以后可以按键查找和检索消息。 < li >在另一个状态存储中具体化消息id列表。 < li >使用Spring的调度机制运行一个单独的线程,该线程通过< code > InteractiveQueryService bean从状态存储中读取数据

我遇到的问题是InteractiveQueryService提供对状态存储的只读访问,因此我无法删除另一个线程中的条目。我决定不使用Kafka Stream的标点符号功能,因为语义学不同;我的调度线程必须始终以固定的间隔运行,而不管入站消息的处理情况如何。

另一种选择可能是使用低级处理器API,并将可写状态存储的引用传递给我的调度程序线程。我需要同步写操作。但我不确定这是否是可行的,或者在从单独的线程访问状态存储时是否存在其他约束。

任何意见或建议将不胜感激!

共有1个答案

白永昌
2023-03-14

我的调度线程必须总是定期运行,不管入站消息的处理如何

嗯,基于< code>WALL_CLOCK_TIME的标点符号完全符合您上面的描述。

我遇到的问题是InteractiveEquieryService提供了对状态存储的只读访问

使用处理器API和标点允许您使用ProcessorContext#getStateStore()访问init()中的状态存储,并从ProcessorContext#计划()中的存储中删除条目。这种解决方案的优点是,处理器和标点符号运行在同一个线程中,您不需要它们之间的任何同步。

 类似资料:
  • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!

  • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

  • 我用状态存储构建了一个kafka流媒体应用程序。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,Kafka会随机拆分分区和状态存储。 例如: Instance1获取:分区-0,分区-1 Instance2获取:partition-2,stateStore-repartition-0 Instance3获取:stateStore-重新分区-1,stateStore-重新分区-2

  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 我有一个KStream,其中包含从主题到1的数据,如下所示: 和KTable,构造如下: 稍后,主题To2中出现以下消息: 现在,我希望我的KTable能够反映这些变化,并且看起来像这样: 但看起来是这样的: 我想我缩小了范围:显然聚合的只在第一次调用--之后聚合总是接收作为最后一个参数,例如。 其中,在第一次调用(通过初始值设定项创建)时为,但在第二次调用时为。 有什么想法吗? 编辑2 编辑3