当前位置: 首页 > 知识库问答 >
问题:

如何在应用程序属性中拥有多个kafka消费群体

乜业
2023-03-14

我正在使用spring kafka并希望在应用程序yaml中提到多个消费者群组,我可以在我的kafkaListener类中使用这些群组,我正在听多个主题。

application.yml文件中的kafka属性现在如下所示

kafka:
    properties:
      topics:
        topic1: topic1
        topic2: topic2
    bootstrap-servers: server1,server2
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 4
    consumer:
      group-id: mygroupid
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

有没有办法在上面的使用者块中有多个群组。

 @KafkaListener(topics = "topic1",  groupId = "cd1")
  public void consumeMessage(String message) throws Exception {
    // some code goes here
  }

共有1个答案

锺离高丽
2023-03-14

引导仅从yml自动配置一个DefaultKafkaConsumerFactoryDefaultKafkaConsumerFactory,因此所有属性在所有使用者之间共享。这就是为什么我们添加了groupid(如果未提供groupid,则使用id);这是消费者之间变化最常见的属性。

当然,您可以使用属性占位符,因此groupid=“${group.one}”将使用YML中的属性group.one

要更改更基本的东西,如序列化器/反序列化器,如果您使用的版本早于2.2.4,则需要创建多个工厂和容器工厂。

但是,从版本2.2.4开始,您现在可以在kafkalistener注释中设置任何任意的kafka使用者属性...

/**
 * Kafka consumer properties; they will supersede any properties with the same name
 * defined in the consumer factory (if the consumer factory supports property overrides).
 * <h3>Supported Syntax</h3>
 * <p>The supported syntax for key-value pairs is the same as the
 * syntax defined for entries in a Java
 * {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
 * <ul>
 * <li>{@code key=value}</li>
 * <li>{@code key:value}</li>
 * <li>{@code key value}</li>
 * </ul>
 * {@code group.id} and {@code client.id} are ignored.
 * @return the properties.
 * @since 2.2.4
 * @see org.apache.kafka.clients.consumer.ConsumerConfig
 * @see #groupId()
 * @see #clientIdPrefix()
 */
String[] properties() default {};

注意,这些属性是本机的kafka虚线格式(auto.offset.reset),而不是boot连字符或camel大小写属性。

下面是文档中的一个示例:

@KafkaListener(topics = "myTopic", groupId="group", properties= {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
 类似资料:
  • 问题内容: 我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。 据我了解,对于每个主题

  • 我想在一个spring boot应用程序中创建多个Kafka消费者组,以处理不同的Kafka队列。需求场景基于消息的关键性,它应该被推送到不同的Kafka队列。为了管理不同的Kafka队列,我想创建一个专用的Kafka消费群体。但我不确定我是否可以在一个spring boot应用程序中创建多个Kafka消费群体。 目前,我有三个Kafka主题,每个主题有4个部分,只有一个Kafka消费群体有三个K

  • 我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消

  • 我是Kafka的新手,我将非常感谢关于下一个案件的澄清。 Kafka文档在“消费者立场”一段中说: 问题是,如果只有一个消费者能够拉出特定的信息,那么如何向多个消费者群体广播呢?

  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 这是否意味着我的整个应用程序只能连接到单个Kafka集群,或者KafkaStreams的每个实例只能连接到单个集群? 我可以创建多个连接到不同集群的具有不同属性的KafkaStreams实例吗?