当前位置: 首页 > 知识库问答 >
问题:

在Kafka主题使用者上实现节流

朱俊雅
2023-03-14

我们正在构建一个系统,以可控且可预测的速率将数据从生态系统中的一个点移动到另一个点。该系统基本上有一个主要的Kafka主题,它获取N种类型的消息。然后,使用此主题的代理根据消息负载主题将消息放入N个工作队列中的一个。然后,这些工人队列让工人实际执行工作。

现在,我想实现控制旋钮,比如-暂停/恢复/限制来自代理的1种类型的消息。假设代理和工作人员之间没有通信,我怎么做呢。我是否应该为代理写入这些消息的每个worker单独设置一个“系统事件”主题?或者最好在顶层也有N个队列?

共有1个答案

颛孙兴旺
2023-03-14

我个人更喜欢由代理中的不同线程处理暂停/恢复/限制逻辑,这些线程位于Kafka使用者之后。这将确保如果要限制特定类型,它不会妨碍对其他类型的处理。因此,基本上Kafka使用者会根据类型将特定于类型的操作委托给不同的线程。

 类似资料:
  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我是Kafka的新用户,我们在应用程序中使用了Spring Web Flux。我们需要向两个不同的主题推送两个不同的消息,比如T1和T2。Kafka经纪人也是一样。我们正在使用ReactiveKafkaProducerTemplate,效果很好。 现在我们只需要单独压缩一个主题[T1]内容,因为消息大小在主题T1上更大。我们是否在响应式Kafka或Project Actor中支持路由KafkaTe

  • 我使用的是ActiveMQ Artemis 2.10和JMS Client1.1客户机。 如果我在我的地址上使用了多播路由类型,并且需要持久订阅,我如何在消费者端实现负载平衡? 在ActiveMQ5中,它将是虚拟目的地。 唯一的选择似乎是ActiveMQ Artemis2.10和JMS Client2.0,它们允许您创建共享的持久订阅,这对吗? 还有第三种选择吗?

  • 我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话