当前位置: 首页 > 知识库问答 >
问题:

从队列使用者动态打开Kafka流

佟和安
2023-03-14

我们有一个用例,在这个用例中,基于到达工作队列的工作项,我们需要使用消息元数据来决定数据流来自哪个Kafka主题。我们部署的工作节点可能少于100个,每个工作节点可以有可配置数量的线程从队列接收消息。因此,如果一个工人有“N”个线程,我们会打开Kafka流到“N”个不同的主题。(n通常小于10)。一旦worker完成了对消息的处理,我们还需要关闭流。工作人员可以接收下一个消息,一旦它的第一个消息,并在这一点上,我需要打开一个Kafka流为另一个主题。此外,每个kafka流都需要扫描所有分区(大约5-10个),以便根据某个属性筛选主题。

像这样的流能为Kafka流工作吗,或者这不是一个最佳的方法?

共有1个答案

汪耀
2023-03-14

我不确定我是否完全理解这个用例,但它似乎是一个从主题a到主题B“简单”复制数据用例,即,没有数据处理/修改。但是,从输入到输出主题复制数据的逻辑似乎很复杂,因此使用Kafka流(即Kafka的流处理库)可能不是最好的选择,因为您需要更多的灵活性。

但是,使用简单的KafkaConsumersKafkaProducers应该允许您实现您想要的内容。

 类似资料:
  • 问题内容: 想要使用高级消费者API实现延迟的消费者 大意: 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。 auto.commit.enable = false(将在每个消息处理之后显式提交) 消费一条消息 检查消息时间戳,并检查是否经过了足够的时间 处理消息(此操作将永不失败) 提交1个偏移 有关此实现的一些担忧: 提交每个偏移量可能会使ZK变慢 Consu

  • 问题内容: 这是我的情况: 当用户登录到我的网站时,我为给定的用户排队执行一系列任务(通常每个任务花费100毫秒的时间,每个用户有100毫秒的任务)。这些任务排队到默认的Celery队列中,而我有100的工人正在运行。我使用websockets在后端完成任务时向用户显示实时进度。如果我只有1个或2个用户处于活动状态,那么生活会很好。 现在,如果我有几个并发用户登录到我的站点,则后一个用户将排在初始

  • 我正在从Sahni的“C语言数据结构基础”中学习数据结构。在使用动态数组的循环队列中,作者提到了以下几点, 假设capacity是循环队列的初始容量,我们必须首先使用realloc增加数组的大小,这将把最大容量元素复制到新的数组中。为了获得正确的循环队列配置,我们必须将右段中的元素(即元素a和B)滑动到数组的右端(参见图3.7.d)。数组加倍和向右滑动一起最多复制2*容量-2个元素。

  • 在一个Spring Boot应用程序中,我想使用Spring集成从一个Kafka队列中读取。配置了以下内容: bean实现。 调试时,很明显,在(stacktrace的顶部)中,为空。 在Spring集成中,将通道连接到应该处理消息的bean的正确方法是什么?

  • 我有多个主题A、B和C,我正在使用Kafka Streams将它们扇形成主题X。主题A、B和C使用默认主题名称策略在模式注册表中注册为主题。流式传输非常愚蠢,因为它只是将消息扇形,而不确保它们符合注册表中的模式,但它会在消息中添加一个ORIGINAL_TOPIC_NAME标头以指示它来自主题A、B或C。 然后,我让Kafka消费者从主题X中消费。该主题未在模式注册表中注册。Kafka消费者是我使用