当前位置: 首页 > 知识库问答 >
问题:

kafka以异步方式处理如何使状态存储中的条目过期的问题

笪波鸿
2023-03-14

我有一个kafka streams拓扑,它从输入主题中读取更新某些状态,并确定状态条目是否需要保留在状态存储中,或者可以删除。如果可以删除,它将被删除,否则我有一个标点器,每10秒运行一次,并使状态存储中的项目过期。

我最近发现标点符号在同一个流线程上运行,并且可能会阻塞流的处理。

我可以使用哪些模式在单独的线程池中执行标点符号内部的逻辑以避免阻塞流处理

谢谢你的帮助。

共有1个答案

山森
2023-03-14

马蒂亚斯·萨克斯已经说过,到目前为止,这在国有商店是不可能的,所以当他在合流公司工作时,我相信这是最新的消息。

然而,我们在本例中使用的是KStream KTable连接,而不是状态存储。我不确定,如果这对你的情况是可能的,但让我解释一下,我们所做的,也许对你也有一些用处:

我们有两个主题A和B,主题A被KStream使用。主题B与KTable一起使用。我们转换KTable数据,这样我们就可以在主题A的KStream上加入它。我们加入它,执行我们的操作,并通过使用mapthroughnull值写入主题B,从而从主题B中“删除”数据。因此,当我们在主题A中获得另一条记录时,我们的KTable中不再有可加入的值(这正是我们想要的)。

我希望它能有所帮助。

 类似资料:
  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 问题内容: 在进行Ajax调用的redux-form 函数中,Ajax中需要Redux状态的某些属性(例如,用户ID)。 这将是很容易的,如果我想定义形式的成分,只要调用:在任何我需要传递,并从阅读中我的。 但是,像一个出色的React-Redux开发人员一样,我编写了 单独的视图组件和容器 ,因此视图组件可以很简单,几乎没有行为。因此,我想 在我的container中定义handleSubmit

  • 全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。

  • 我们目前正在实现一个过程(使用Kafka处理器API),我们需要将来自一个主题的两个相关事件(消息)的信息合并,然后转发这些合并的信息。事件源于物联网设备,由于我们希望保持其有序,因此源主题使用设备标识符作为键。事件还包含相关ID: 钥匙 留言 我们的第一种方法是创建一个具有连接状态存储的处理器,该存储存储每条传入的消息,使用相关ID作为键。这使我们能够查询存储以获取传入消息的相关ID,如果存储中

  • 我正在使用angular async本地存储模块设置本地存储中令牌的到期时间。 然后,我有一个在用户登录时调用的函数,该函数检查令牌是否仍然有效: 这里的问题是localStorage.getItem()返回一个可观察的。所以当我们在可观察的上调用。订阅时,它被异步调用,这意味着我们不会阻塞,直到结果准备好,所以代码直接通过并执行返回res语句,此时res是未定义的,因为订阅箭头函数中的代码尚未执

  • 我正在尝试在从CrudRepository扩展的存储库接口上执行我试图在方法中使用存储库实现来执行此操作。当我运行以下代码时,线程在 UpdateTasksService List 中永久等待