当前位置: 首页 > 知识库问答 >
问题:

多个消费者如何在Spring引导Kafka中收听多个主题?

齐磊
2023-03-14

当有多个消费者时,我无法听到kafka主题(我的案例2主题)。在下面的例子中,我有2个消费者工厂,它将接受2个不同的JSON消息(一个是用户类型,另一个是事件类型)。这两条消息都发布到不同的主题。在这里,当我试图从topic1访问事件消息时,我无法访问,但我可以访问用户主题消息。

例如:

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {      
@Autowired
private Environment environment;

@Bean
public ConsumerFactory<String,User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(User.class));

}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(Event.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryEvent());
    return factory;
}
}

我的主要应用如下:

@KafkaListener(topics = "${event.topic}")
public void processEvent(Event event) {
..do something..
..post the message to User topic
}
@KafkaListener(topics = "${user.topic}")
public void processUser(User user) {
..do something..
}

我的需要是首先收听事件主题并对消息进行一些按摩,然后向其发送用户主题,我有另一种方法将收听用户主题并对该消息执行某些操作。我试图将不同的选项传递给@KafkaListener例如

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")

但它不起作用。我不确定出了什么问题。任何建议都很有帮助!

共有3个答案

傅志文
2023-03-14

很明显,回答得太晚了。但它可能会帮助其他人。

您不需要创建多个消费者工厂Bean。您可以在配置中不通知类(UserEvent),即new JsonDeserializer

@Bean
public ConsumerFactory<String,User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    // TODO: Remove "*" and add specific package name
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // <<-- New config added

    return new DefaultKafkaConsumerFactory<>(config);

}

接收记录时:

@KafkaListener(topics="${event.topic}")
void receiveUserRecord(User record){ ... } # For User POJO

@KafkaListener(topics="${event.topic}")
void receiveEventRecord(Event record){ ... } # For Event POJO

洪飞白
2023-03-14

在任何文档中都不容易获得它。

这里我以使用来自

topic=topic1,bootstrapserver=url1(JSON序列化程序和反序列化程序)

topic = topic 2 with bootstrapserver = URL 2(Avro序列化程序和反序列化程序)

第1步:-

@Bean
public ConsumerFactory<String, String> consumerFactory1() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost1:9092"); //This is dummy
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConsumerFactory consumerFactory2() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost2:9092"); //This is dummy
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "https://abc.schemaregistery.example.com"); //Again this is dummy or can be avro serilaised class
    return new DefaultKafkaConsumerFactory<>(props);
}


  @Bean(name = "kafkaListenerContainerFactory1")
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory1() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory1());
    return factory;
}

 @Bean(name = "kafkaListenerContainerFactory2")
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory2());
    return factory;
}

步骤2:-

  • @SpringBootApplication(排除 = Kafka自动配置.class) =

第三步:-

 @KafkaListener(
            topics = "topic1",
            containerFactory = "kafkaListenerContainerFactory1" ,
            groupId = "com.groupid1")
    public void receive(ConsumerRecord consumerRecord) throws InterruptedException {


        LOGGER.info("consuming from topic1 {}" , consumerRecord.value());
        Thread.sleep(1000000); //For testing

    }

 @KafkaListener(
            topics = "topic2",
            containerFactory = "kafkaListenerContainerFactory2" ,
            groupId = "com.groupid2")
    public void receive(ConsumerRecord consumerRecord) throws InterruptedException {


        LOGGER.info("consuming from topic2 {}" , consumerRecord.value());
        Thread.sleep(1000000); //For testing

    }
柳梓
2023-03-14

如果您没有在bean中指定名称,则方法名称将是bean名称,请在@KafkaListener中添加带有groupid的bean名称

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")

或者

在<code>@Bean</code>中指定名称,并将该名称添加到<code>@kafkaListener</code>中

@Bean(name="kafkaListenerContainerFactoryEvent")
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}
 类似资料:
  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者

  • 我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。 应用程序的单个实例,并发数为6。 该主题有6个分区。 从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。 每个线程继续处理消息。最后,其中一个消费者失败了,错误如下 我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?

  • 问题内容: 我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。 据我了解,对于每个主题

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?