当前位置: 首页 > 知识库问答 >
问题:

过滤Kafka事件并稍后处理

寇丰
2023-03-14

我正在使用Kafka Streams API (KTable,GlobalKTable..).我在用KStreams消费Kafka主题。我需要根据一些配置过滤出一些传入的Kafka事件,并在配置发生变化时处理它们。主题的持续时间限制至少为7天。以下是要求:

键值状态

K1 V1加工

K2 V2 未处理(基于某些业务逻辑)

K3 V3 已处理

K4 V4加工

K1 V5加工 ------

现在我想再次处理消息(K2,V2)。我试着利用Ktables。但是,不能够成功。因为,我对这个概念比较陌生,不确定KStream、KTable是否能够满足这个要求。

共有1个答案

司英彦
2023-03-14

看起来您遇到了一个问题,即某些消息在第一次遇到时无法“处理”,并且您希望稍后返回并处理它们。

想到解决这个问题的唯一方法是将这样的消息转发到另一个主题供以后处理,(这里< code>branch函数可能有些用处),从而允许以线性方式继续处理主流。

您需要使用自定义处理器来处理延迟的主题,该主题可以选择Hibernate一段时间,或使用一些其他逻辑来确定何时处理消息。

但是,这种方法可能只适用于未处理的消息以后可以按最初遇到的相同顺序进行处理的情况。如果不是,则可能会遇到可处理消息位于延迟队列中不可处理消息后面的问题。您也许可以解决这个问题,因为超时,之后仍然无法处理的消息将回发到主题末尾。但这完全取决于您的用例。

 类似资料:
  • 我正在使用NSeriveBus构建一个系统,它应该只在特定的时间段将消息发送给远程处理程序。到目前为止,我设法将所有消息放在一个处理队列上,并从那里检查远程处理程序的可用性,如果处理程序不可用,我就不会通过边界发送消息。要做到这一点,我正在使用 但国家安全局会继续努力。这不像其他侦听器将在几分钟内启动并运行,但它可能有几个小时的停机窗口,因此这并不完全有效。 想知道是否有办法让总线稍后重试消息,或

  • 我一直在检查Kafka流。我一直在测试下面的Kafka流代码 生产者主题:(这是第一个生产者主题-发送以下json数据) JSON-主题的生产者: Stream Topic代码:(这是第二个Streaming代码和主题) 如果UserID值为“1”,我想对其进行归档,然后将该数据发送到目标流媒体主题。 当我使用“.filter”并打印System.out时。println(“value:”valu

  • 对于这个用例,我应该使用Kafka Consumer API还是Kafka Streams API?我有一个话题与一些消费群体消费它。本主题包含一种类型的事件,它是一个内部埋藏了一个类型字段的JSON消息。一些信息会被一些消费者群体消费,而另一些消费者群体不会消费,一个消费者群体可能根本不会消费很多信息。 我的问题是:我是否应该使用消费者API,然后在每个事件上读取type字段,并删除或处理基于t

  • web3j过滤器提供以太坊网络发生的某些事件的通知,对java和安卓程序员来说很有用。在Ethereum以太坊中支持三类过滤器: 块滤波器(Block filters) 未决交易过滤器(Pending transaction filters) 主题过滤器(Topic filters) 块过滤器和未决交易过滤器提供了在网络上创建新交易或块的通知。 主题过滤器更灵活。允许根据提供的特定标准创建过滤器。

  • 在Controller/JAX-RSendpoint处理请求后,如何执行Spring Security过滤器? 当请求被双向处理时,一个普通的java过滤器链应该得到控制。如何使用Spring Security过滤器实现这一点?我试着给chain打电话。doFilter()开始,并在调用后添加我的逻辑。但是,在处理请求后,控件不会返回。 关于如何实现这一点,有什么建议吗?是否有可能使用Spring

  • 在Java8的Streams中,我知道如何基于谓词过滤集合,并处理谓词为true的项。我想知道的是,如果谓词仅将集合划分为两个组,那么是否可以通过API基于谓词进行过滤,处理过滤结果,然后立即连接调用以处理过滤器排除的所有元素? 例如,考虑以下列表: 是否有可能做到: 或者我只需对过滤的项目执行过程,然后调用原始列表上的和,然后处理剩余的项目? 谢谢