当前位置: 首页 > 知识库问答 >
问题:

是否有可能有一个Kafka消费者线程每个主题?

黎苑博
2023-03-14

ConsumerThread1-[topic1-0,topic2-0,topic3-0]
ConsumerThread2-[topic1-1,topic2-1,topic3-1]

但是,我们希望每个主题有一个使用者线程,而不是每个分区有一个KafkaListener(或使用者线程)。例如:

ConsumerThread1-[topic1-0,topic1-1]
ConsumerThread2-[topic2-0,topic2-1]
ConsumerThread3-[topic3-0,topic3-1]

有什么办法可以做到这一点吗?

共有1个答案

罗金林
2023-03-14

您可以从Spring-Kafka:2.2中为每个主题创建单独的容器,并将并发性设置为1,以便每个容器将从每个主题中消耗

从版本2.2开始,您可以使用同一个工厂创建任何ConcurrentMessageListenerContainer。如果您希望创建多个具有相似属性的容器,或者希望使用一些外部配置的工厂,例如Spring Boot自动配置提供的工厂,这可能会很有用。创建容器后,可以进一步修改其属性,其中许多属性是通过使用container.getContainerProperties()设置的。下面的示例配置ConcurrentMessageListenerContainer:

@Bean
html" target="_blank">public ConcurrentMessageListenerContainer<String, String>(
    ConcurrentKafkaListenerContainerFactory<String, String> factory) {

ConcurrentMessageListenerContainer<String, String> container =
    factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}

注意:以这种方式创建的容器不会添加到endpoint注册表中。应该将它们创建为@bean定义,以便在应用程序上下文中注册它们。

 类似资料:
  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

  • 有一个16个分区的Kafka主题 使用给定的消费者组名称,我们目前正在启动单个消费者来阅读该主题。 > 单个消费者是否从该主题的(仅)读取?如果带有消息为空,消费者是否从下一个分区开始读取(...等等)? 我们可以选择启动多个消费者(使用相同的消费者组名称)来读取相同的主题(有16个分区)。为了并行读取多个分区,可以维护多少消费者?

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

  • 我了解到,每个kinesis流可以有多个消费者应用程序。 http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html 但是,我听说你只能在每个分片的消费者上有。这是真的吗?我找不到任何留档来支持这一点,我无法想象如果多个消费者从同一个流中阅读会是什么样子。当然,这并不意味着生产者需要为不同的消费者

  • “Kafka spout”和“Kafka Consumer”都从Kafka经纪人那里检索数据,到目前为止我知道的spout是用来与Storm通信的,而Consumer是用来与其他任何东西通信的。 --但是,技术上的区别是什么? -或者,如果我使用Consumer提取数据,然后使用“Storm Spout”接收数据,和如果我只是使用“Kafka Spout”,然后将其添加到我的Storm拓扑构建器的

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。