当前位置: 首页 > 知识库问答 >
问题:

在 Kafka 中等待 N 个事件:组计数筛选器方法如何工作?

云开诚
2023-03-14

在《设计事件驱动系统》一书中,作者强调了业务系统中的一个常见用例,其中需要等待或发生N个事件。

给出的示例是一个订单服务,它需要等待三个单独的验证服务(都是通过同一主题发送的)返回PASS。(我的解释是,对于同一主题,将有三条具有相同密钥的验证消息,每条消息都有一个表示成功或失败的值。)

作者表示,解决方案将采用以下形式(假html" target="_blank">设计数基于密钥):

  1. 按密钥分组。
  2. 计数每个键的出现次数(使用使用窗口执行的聚合器)。
  3. 过滤所需计数的输出。

上述每个步骤究竟是如何工作的,涉及哪些类/方法?

  • 特别是,第一步(按键分组)是指使用KStream::groupByKey方法吗
  • 如果是这样,输出将是一个KGroupedStream,步骤二可能使用一个Count方法并返回一个KTable
  • 但是,有了KTable,我们如何按照步骤三过滤KTable上的输出

共有1个答案

辛承
2023-03-14

我想你的假设是对的。对于步骤(2 ),这要视情况而定,但是如果您假设您正在等待的所有消息都具有相同的密钥,并且您只对收到三个具有相同密钥的消息感兴趣,那么您需要调用< code>count()。

作为下一步(即(3)),您将调用KTable#过滤器()来获取计数为3的所有行。

最后,您可以调用toStream(),并且每次一个键达到计数3时,此流都应该包含一条记录。

(附带说明:默认情况下,所有条目都将永远留在< code>KTable中,因此您还需要注意删除在某个时候达到计数3的条目。)

 类似资料:
  • 我有一个控制器类,它需要为列表中的每个项目执行各种异步操作: 对于每个A 播放音频并等待完成 等t秒 对于A的每个子部分B: 播放音频并等待 等待t2秒 当我的方法被激发。我试着回复听众: 但最后我还是无法摆脱内部循环,这让我觉得整个想法都行不通。我已经查看了未来的界面,但这似乎也不是我想要的。 我知道我可以用一堆变量来解决这个问题,以跟踪我在状态流中的位置,但是带有一些框架的循环会更干净。谢谢:

  • 问题内容: 我需要启动大量goroutine,并等待其终止。直观的方法似乎是使用通道来等待所有操作完成: 但是问题是对象的数量以及goroutine的数量可能会改变。是否可以更改通道的缓冲区大小? 也许有更优雅的方式做到这一点? 问题答案: 我已经使用WaitGroup作为此问题的解决方案。翻译您的当前代码,并附上一些日志,以明确正在发生的事情:

  • 我正在写一段代码来登录Gmail。在密码页面上,我不是使用隐式等待,而是想改用显式等待。然而,它不是拿起我的选择器? 我得到了一个错误: (节点:14428)UnhandledPromiseRejectionWarning:错误:processTicksAndRejections(internal/process/next_tick.js:81:5)处的elementHandle._clickab

  • 我有非常简单的AngularJs 1.4.8前端: 填写表单并按下OK按钮后,新人员将添加到表中。 表单在中,表在中,两者都是下的孩子。 添加用户时(单击确定)会发生以下情况: < li >向服务器发送< code>POST请求,以便添加新人员。 < li >使用< code>$emit从< code > addingPersonController 向< code>masterControlle

  • 正如您所看到的,它有一个构造函数,它可以得到过滤器列表,所以我可以根据需要从链中删除一个过滤器,其余的都可以正常工作。但是我不能为这样的构造函数在安全配置中创建bean。如果我使用 当然,它使用默认构造函数构建对象。好的,我试着用一些过滤器的列表来制作bean: 但这无法编译,因为BasicUserApprovalFilter是未知bean。那么如何从默认筛选器堆栈中排除一个筛选器呢?如果我用自定

  • 我创建了一个日历应用程序,可以添加注释。为了实现添加注释的功能,我创建了一些父组件,它有自己的状态,然后传递给子组件。子组件应该在构造函数中接受来自执行的。但是,由于setState函数异步ChildComponent没有时间等待父母组件的道具。 如何设置ChildComponent的初始状态,等待父母组件的道具(换句话说,同步)? 父组件: ChildComponent: