当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams DSL:如何关闭由流分支/分组创建的子流?

左康安
2023-03-14

我正在用Scala编写一个Kafka Streams应用程序,我担心潜在的内存泄漏/总的资源使用。

是否有一种方法向Kafka发出信号,让它“关闭”分组/分支操作创建的特定子流,并释放相关资源?

为了演示潜在的问题,让我们考虑一个电子商务应用程序,它将订单状态更改事件推送到一个名为“my-super-input-topic”的Kafka主题。每个订单都由OrderId唯一标识,OrderId用作Kafka消息键。

假设我们需要计算每个订单的状态更新计数,并将结果推送到“my-super-output-topic”主题。下面的代码段演示了如何在Scala中执行此操作:

// ...
val builder = new StreamsBuilder
val ktable = builder.stream("my-super-input-topic")
    .groupByKey
    .count

ktable.toStream.to("my-super-output-topic")
// ...

据我所知,.groupby/.groupbykey将源流划分为N个子流(在本例中,每个订单一个子流)。上面的代码没有指定任何保留窗口,因此,即使给定的订单(子流)在几小时不活动后接收到事件--它仍将被正确处理,并且更新将被推送到包含正确聚合计数的接收器主题。

因此,我得出结论,Kafka将每个子流的信息保存在某种内部存储中。

然而,订单有有限的生存期,并且在某个时间订单完成之后,这意味着与该订单相关的子流将永远不会接收到更多的事件。但Kafka仍然把它当作一个有效的,等待进一步的消息,越来越多的“死”子流会随着越来越多的订单被完成而累积。如果Kafka至少专用一些资源来跟踪每个子流,那么“死的”子流就会造成广泛的内存使用,即使这是完全不必要的。

因此,一旦系统了解到相关订单已完成,处理/关闭特定子流将是合理的。

注意:这是一个虚构的用例来演示具体问题,而不是一个真实的任务。请不要建议在没有Kafka流的情况下实施。

共有1个答案

权烨磊
2023-03-14

您的聚合将永远为每个键保留一个计数,这是正确的。然而,“子流”是基于每个分区的,因此,每个子流应该总是包含一些数据。

无法关闭拓扑的部分。

如果您担心KTable存储区的无限增长,您可以考虑(1)使用窗口存储区,该存储区最终将驱逐旧数据;(2)使用Aggregate()而不是count:默认情况下Aggregate()将仅计数,但如果订单已完成,UDF可以返回NULL--这将从存储区中删除订单的键值对。(3)或者您考虑不使用DSL,而是使用处理器API,该API提供更多的控制/灵活性来主状态存储(您也可以考虑使用“标点符号”)。

您可能还对以下内容感兴趣:https://issues.apache.org/jira/browse/kafka-4212

 类似资料:
  • 我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt

  • 在偶尔的情况下,你可能会想要保留那些与你的代码没有共同祖先的分支。例如在这些分支上保留生成的文档或者其他一些东西。如果你需要创建一个不使用当前代码库作为父提交的分支,你可以用如下的方法创建一个空分支: git symbolic-ref HEAD refs/heads/newbranch rm .git/index git clean -fdx <do work> git add your

  • 想改进这个问题吗 通过编辑此帖子,添加详细信息并澄清问题。 很抱歉为这些愚蠢的问题烦恼,但我如何才能禁用目录子文件夹分组?每次我用子文件夹创建目录时,它们都被分组在一行中(参见屏幕)。目录屏幕感谢您的回答。

  • 问题内容: 我正在使用创建一个选项对话框; 对于options参数,我传递了一个JButtons数组,每个数组都有自己的。 这些按钮之一负责关闭对话框。我的问题是:我应该在关闭按钮的事件处理程序中放置什么代码以关闭选项对话框? 这可能会有所不同:负责显示此对话框的类是单例,因此,负责显示对话框的方法是 static 。因此,调用“在静态上下文中”不起作用。 谢谢 问题答案:

  • 创建分组       进入分组管理界面,点击新增分组,打开新增分组面板,输入分组名称,选择对应的权限,点击确定,完成分组创建。

  • 创建一个分组,是非常简单的,我们来学习下吧 try { // 创建分组 $group = Sentry::createGroup(array( 'name' => 'admin', 'permissions' => array( 'admin.index' => 1, 'users.ind