当前位置: 首页 > 知识库问答 >
问题:

如何在一个属性文件中恰当地外部化spring-boot kafka-streams配置?

吕鸿轩
2023-03-14

我试图将我目前用Java代码编写的spring-kafka应用程序的配置外部化。我是否应该将< code>ProducerConfig和< code>ConsumerConfig值放入< code > spring . Kafka . streams . properties 中,或者如果我通过< code > spring . Kafka . producer 和< code > spring . Kafka . consumer 提供它们,它们是否会得到正确配置?

到目前为止,我似乎应该将所有配置都放入 KafkaStreams 配置类型的 Bean 中,以便配置我的 kafka 流应用程序。

当我将此配置外部化时,在< code > application . properties 文件中设置< code>ProducerConfig和< code>ConsumerConfig的属性值似乎与它们在spring-boot创建的< code > KafkaStreamsConfiguration 中没有关联(我通过在某处自动连接配置并查看它来确认这一点)。

如果我改为通过< code > spring . Kafka . streams . properties 提供< code>ProducerConfig和< code>ConsumerConfig值,它们会显示在< code > KafkaStreamsConfiguration 中。

以下是我的旧Java配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put("replication.factor", replicationFactor);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
        props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "600000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new KafkaStreamsConfiguration(props);
    }

这最终导致生产者配置消费者配置值不在运行时的 KafkaStream 配置中:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.consumer.auto-offset-reset=latest #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

但是,这确实会导致< code > KafkaStreamsConfiguration 具有预期的值:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.streams.properties.group-id=<group_id> #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.compression-type=lz4 #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.streams.properties.auto-offset-reset=latest #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

我原本以为生产者配置消费者配置值在分别通过春卡福卡生产者和卡福卡消费者设置时会传播到Kafka流配置。特别是因为我在IntelliJ中获得了智能感知,用于应用程序属性中的生产者和消费者配置。

也就是说,我是否需要确保通过< code > spring . Kafka . streams . properties 设置它们,以便正确配置应用程序?

共有1个答案

养枫涟
2023-03-14

spring.kafka.consumer.group-id=

Streams将group.id设置为application.id属性。

公共静态最终字符串APPLICATION _ ID _ CONFIG = " APPLICATION . ID ";

private static final String APPLICATION_ID_DOC=“流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认客户端ID前缀,2)成员身份管理的组ID,3)更改日志主题前缀。”;

Kafka财产

生产者消费者属性是截然不同且不相关的。

< code > spring . Kafka . producer . compression-type = lz4 #这不会显示在KafkaStreamsConfiguration中

压缩类型不作为流的第一类启动属性公开。您可以使用设置它

spring.kafka.streams.properties.compression.type=gzip
 类似资料:
  • 我有这样的配置,需要用于Spring Boot应用程序。 默认情况下,spring-boot会拾取位于src/main/resources/中的application.properties文件 我想改变这个路径并将Spring Boot到不同的application.properties文件 我可以使用 有没有任何替代的解决方案,我可以做到这一点,而不通过命令行传递参数? 在我的主要班级 在我尝试

  • 每个人都知道如果我们想要读取属性文件,我们可以这样做: 但是,现在我有了一个类似于SpringBoot的框架。它可以将Spring与Mybatis集成在一起。 问题是前面的代码只能读取我的项目类路径文件,但我需要使用我的框架读取project属性文件。我是怎么做的?

  • 我试图实现的是在application.yml文件上指定一个目录,该目录直接位于类路径上(在/resources下)。我希望能有这样的东西: 使用这种方法,IDE将始终默认为application-dev.yml。当我通过gradle构建应用程序,并在传递命令行参数的同时运行它时,我可以指定配置文件,从而加载适当的文件。理想情况下,能够做到这一点: java-jar-dspring.profile

  • 我想将@EnableJPARepositories基本包的配置外部化。 我有两个不同的样品包如下 com。项目道博士 我尝试了下面的属性外部化(不适用于多个包) ProjectConfig.class 配置。性质 对于多个包,是否有其他方法将此配置外部化? 谢谢

  • 我有一个内部application.yml文件,位于classpath资源中,包含以下字段: 有一个外部配置文件:config.properties。它定义了一些要在我的服务器上下文中重写的字段。文件配置.属性: 我正在使用以下命令运行应用程序: 如何使spring正确地重写内部配置文件,使其只重写额外的属性(redis.hostname、redis.password),但仍然保留在内部文件中定义

  • 我想在spring Boot中设置3个配置文件:production,development,test,使用外部配置文件。 应用程序类: AppConfig类: