当前位置: 首页 > 知识库问答 >
问题:

用于事件过滤的Kafka使用者API与Streams API

毕嘉
2023-03-14

对于这个用例,我应该使用Kafka Consumer API还是Kafka Streams API?我有一个话题与一些消费群体消费它。本主题包含一种类型的事件,它是一个内部埋藏了一个类型字段的JSON消息。一些信息会被一些消费者群体消费,而另一些消费者群体不会消费,一个消费者群体可能根本不会消费很多信息。

我的问题是:我是否应该使用消费者API,然后在每个事件上读取type字段,并删除或处理基于type字段的事件。

谢谢你。

共有1个答案

武功
2023-03-14

这似乎更多的是见仁见智的问题。我个人倾向于使用Streams/KSQL,这可能是您需要维护的较小代码。您可以有另一个中间主题,其中包含清理后的数据,然后可以将这些数据附加到连接接收器、其他使用者或其他流和KSQL进程。使用流,您可以在不同的机器上扩展一个应用程序,您可以存储状态,拥有备用副本等等,这将是一个PITA做这一切自己。

 类似资料:
  • 我正在使用Kafka Streams API (KTable,GlobalKTable..).我在用KStreams消费Kafka主题。我需要根据一些配置过滤出一些传入的Kafka事件,并在配置发生变化时处理它们。主题的持续时间限制至少为7天。以下是要求: 键值状态 K1 V1加工 K2 V2 未处理(基于某些业务逻辑) K3 V3 已处理 K4 V4加工 K1 V5加工 ------ 现在我想再

  • 我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。 事务性Kafka生产者的配置 普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加。 使用者配置如下 因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务

  • 我试图理解Kafka的事务性API。此链接定义原子读-进程-写周期如下: 首先,让我们考虑原子读-进程-写周期是什么意思。简而言之,它意味着如果应用程序在某个主题分区tp0的偏移量X处消耗消息A,并在对消息A进行一些处理后将消息B写入主题分区tp1,使得B=F(A),那么只有当消息A和B被认为成功消耗并一起发布或根本不发布时,读-进程-写周期才是原子的。 它还说: 使用为至少一次交付语义配置的va

  • 我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。

  • 本文向大家介绍dynamics-crm 使用过滤器过滤API查询,包括了dynamics-crm 使用过滤器过滤API查询的使用技巧和注意事项,需要的朋友参考一下 示例 您可以使用filter属性从CRM检索值的子集。在此示例中,仅返回公司名称等于CompanyName的帐户。            

  • 我了解了如何通过使用Apache Kafka作为事件代理来实现事件源。(链接到融合文章)