当前位置: 首页 > 知识库问答 >
问题:

KStream筛选器是否会使用每条消息?

曾嘉瑞
2023-03-14

我过去使用过Kafka,但从未使用过streams API。我的任务是构建一个可扩展的服务,该服务接受websocket连接,并根据用户id将出站消息从中心主题路由到正确的会话。

使用KStream看起来非常简单

builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
        .filter((name, publication) -> "George R. R. Martin".equals(publication.getName()))
        .to(outputTopic, Produced.with(Serdes.String(), publicationSerde));

但是筛选器命令是否消耗了主题中的每一条消息并在应用程序空间中执行筛选器?或者KStream

KStream上的措辞

如果过滤器的唯一目的是使用某个主题的每一条消息,并扔掉那些不相关的消息,我可以手动完成。

共有1个答案

裴弘
2023-03-14

您是正确的-消息需要反序列化,然后根据谓词进行检查(在应用程序空间中)

扔掉那些不相关的东西,我可以手工完成

当然可以,但是Kafka Streams提供了定义会话窗口的有用方法。另外,您不需要定义消费者和生产者实例来转发新主题。

 类似资料:
  • 主要内容:创建条件筛选器在Tableau中,条件过滤器用于将某些条件应用于现有过滤器。这些条件非常简单,例如,仅查找高于特定金额的销售额。此外,这些条件可用于创建范围过滤器。 创建条件筛选器 例如,假设有一个Sample-superstore数据源,在销售额超过200万的所有细分市场中找到产品的子类别。在Tableau中创建条件筛选器有以下一些步骤。 第1步: 将Segment字段和Sales字段拖到列工具架。 第2步:

  • 我正在使用谷歌表单的过滤功能,但无法按我想要的方式使用,已经3天了。。。 基本上,我有第1页,有一列“电子邮件”和一列“潜在客户ID”。表2具有相同的“潜在客户ID”,但已过滤。含义,第1页,其“顺序为1,2,3,4,5…”。。。第二张不是,像是2,4,5,23,41。。。我想在表1中找到正确的电子邮件地址,该地址在两个表中具有相同的Lead ID。我使用了Filter函数,它工作得非常好,因为它

  • 好的,目标是:我有一个应该发送邮件的服务,如果失败,我的Kafka制作人将把这封邮件发送到Kafka主题。第二个程序每两分钟查看一次主题,应该只使用一条消息(最早的一条),然后重试发送,如果失败,程序应该将此消息返回主题。 我已经有了一个消费者,但问题是,它会消耗我直到现在还没有使用消费者的所有消息。但我希望他只吃最老的,他以前从未吃过。 这是我的实际消费者: “CustMessage”是我为测试

  • 这是web.xml代码: 服务器以这种方式运行得很好,如果我删除过滤器上的注释,它将无法启动,并会抛出那些异常: 严重:子容器在启动Java . util . concurrent . execution exception时失败:org . Apache . Catalina . life cycle异常:无法启动组件[StandardEngine[Catalina]。StandardHost[

  • 应用筛选器之前的示例使用者记录是(在值中查找GP_ID): 当我在kafkaListenerContainerFactory()中按如下方式设置recordFilterStrategy时: KafKareCordvo.ConvertByteBufferToLong正在将bytebuffer值转换为long值。 但是,当它被Kafka听众按以下方式消费时: 这将返回删除我筛选的字段值的记录:“gp_

  • 首先,我试图使选择所有复选框,如果我单击表头中的选择所有复选框,整个表行将选择并显示一个复选框反向消息,即我选择了多少复选框。这里的问题是,如果我单击select all复选框,反向消息不会显示楼上的表,即我选择了多少行。 其次,如果我从任何列中筛选任何数字,相同的数字将显示同一列中有多少行具有相同的数字。如果我选中了所有复选框,那么反向消息将显示我选中了多少行复选框。这里,问题是显示整个表行计数