当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka消费者-从复杂对象获取主题

笪煌
2023-03-14

我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。

html prettyprint-override">application:
  kafka:
    consumer-config:
      - name: consumer-a
        topics: topic1,topic2
        ......
      - name: consumer-b
        topics: topic3,topic4
        .....

我无法将应用程序属性中的主题列表设置到KafKalistener。

@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")


@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")

在这两种情况下,我都得到以下错误:

java.lang.IllegalArgumentException:无法解析占位符

从应用程序属性获取主题并将其设置在KafkaListener主题上的最佳方法是什么?

共有1个答案

周和志
2023-03-14

你用的是什么版本?我刚测试了一下,效果很好...

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
    public void listen(String in) {
        System.out.println(in);
    }

}

2020-08-25 13:02:28.384警告66237--[o63583349-0-C-1]org.apache.kafka.clients.NetworkClient:[Consumer ClientID=Consumer-SO63583349-1,GroupID=SO63583349]提取相关id为41的元数据时出错:{topic1=unknown_topic_or_partition,topic2=unknown_topic_or_partition}

对于第二个,不能在属性占位符内使用SpEL选择。这里有一个解决这种情况的方法:

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
            id = "so63583349")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    Props props() {
        return new Props();
    }

}

@ConfigurationProperties(value = "application.kafka")
class Props {

    List<Properties> consumerConfig;

    public List<Properties> getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(List<Properties> consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

}
 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我目前正在与Kafka和Flink合作,我有kafka在我的本地PC上运行,我创建了一个正在消费的主题。 桌面\kafka\bin\windows 有没有办法进一步了解这条消息的细节?比如说时间?钥匙我查看了Kafka的文档,但没有找到关于这个主题的内容