当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud Streams时如何在代码中设置Kafka Streams属性?

习海
2023-03-14

在使用Kafka的Spring Boot中,我可以如下设置ConsumerFactory的属性:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, EnrichedOrder> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "barnwaldo");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedOrderDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, EnrichedOrder> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, EnrichedOrder> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}

使用Kafka Streams,我可以在代码中设置属性,如下所示:

    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

使用Spring Cloud Streams和Kafka Streams时,所有属性似乎仅通过应用程序输入。属性或应用程序。资源文件夹中的yml文件,如

spring.cloud.stream.bindings:
    output:
        contentType: application/json
        destination: data2
    input:
        contentType: application/json
        destination: data1
spring.cloud.stream.kafka.streams:
    binder:
      brokers: localhost
      configuration:
        commit.interval.ms: 1000
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    bindings.input.consumer:
        applicationId: data-tester

在将Spring Cloud Streams与Kafka Streams一起使用时,是否有办法在HashMap或Properties中包含属性。

也许这可以通过KafkaMessageChannelBinder或通过扩展AbstractMessageChannelBinder来实现-请参阅https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/7355ada4613ad50fe95430f1859d4ea65f004be1/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java.

我找不到关于这个的留档;非常感谢任何帮助。

共有1个答案

养学
2023-03-14

默认情况下,它在活页夹级别具有支持,其中属性应以spring作为前缀。云流动Kafka。流。活页夹 文字

https://cloud.spring.io/spring-cloud-static/Greenwich.M3/multi/multi__apache_kafka_streams_binder.html#_configuration_options_3

如果您看到KafkaStreamsBinderSupportAutoConfiguration类,您可以看到bean配置,它读取yaml属性并设置为kafka流。

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/master/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java

 类似资料:
  • 我有许多s和它们每个的下面,一个具有

  • 问题内容: 我正在尝试运行我的Selenium Java代码来测试网页。但是由于网络限制,网页无法加载。当我手动设置代理并在浏览器中单击URL时,它可以正常工作。现在,我需要在运行selenium代码时传递那些代理设置。请帮我。 我尝试下面的代码,但仍然显示相同的错误: 问题答案: 问题已通过以下代码解决-

  • 问题内容: 我知道毕加索将图像加载到imageview等中,但是如何使用毕加索设置布局背景图像? 我的代码: 我在这里遇到的任何错误都表明它无法解决。我有一个ScrollView和相对布局。 问题答案: 使用毕加索的回调 更新: 也请检查此内容。如评论中提到的@OlivierH。

  • 我试图使用php代码设置max_allowed_数据包,但出现以下错误: WordPress数据库错误访问被拒绝;对于require('wp-blog-header.php')、require_once('wp-load.php')、require_once('wp-config.php')、require_once('wp-settings.php')、include('/themes/supr

  • 问题内容: 这段代码返回一个错误:AttributeError:无法设置属性这真的很遗憾,因为我想使用属性而不是调用方法。有谁知道为什么这个简单的例子不起作用? 问题答案: 这是你想要的吗? 取自http://docs.python.org/library/functions.html#property。

  • 我想通过代码强制Chrome调试器中断一行,或者使用某种注释标记,比如。