当前位置: 首页 > 知识库问答 >
问题:

如何通过YAML中的Spring Cloud Stream提供Kafka Streams属性?

傅自明
2023-03-14

我想移动spring.kafka.streams.*spring.cloud.stream下-这可能吗?我想到了流属性类似于消费者属性生产者属性,但它不起作用。

spring:
  cloud:
    config:
      override-system-properties: false
      server:
        health:
          enabled: false
    stream:
      bindings:
        input_technischerplatz:
          destination: technischerplatz
        output_technischerplatz:
          destination: technischerplatz
      default:
        group: '${spring.application.name}'
        consumer:
          max-attempts: 5
      kafka:
        binder:
          auto-add-partitions: false
          auto-create-topics: false
          brokers: '${values.spring.kafka.bootstrap-servers}'
          configuration:
            header.mode: headers
          consumer-properties:
            allow.auto.create.topics: false
            auto.offset.reset: '${values.spring.kafka.consumer.auto-offset-reset}'
            enable.auto.commit: false
            isolation.level: read_committed
            max.poll.interval.ms: 300000
            max.poll.records: 100
            session.timeout.ms: 300000
          header-mapper-bean-name: defaultKafkaHeaderMapper
          producer-properties:
            acks: all
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            max.in.flight.requests.per.connection: 1
            max.block.ms: '${values.spring.kafka.producer.max-block-ms}'
            retries: 10
          required-acks: -1
  kafka:
    streams:
      applicationId: '${spring.application.name}_streams'
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.timestamp.extractor: org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
        state.dir: '${values.spring.kafka.streams.properties.state.dir}'

共有1个答案

王伯寅
2023-03-14

您可以使用spring绑定streams属性。云按以下方式传输:

spring.cloud.stream.kafka.streams.binder.applicationId: my-application-id
spring.cloud.stream.kafka.streams.binder.configuration:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

有关更多详细信息,您可以参考留档:

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder

 类似资料:
  • 问题内容: 我正在尝试托管一个我使用Facebook样板在本地创建和测试的React应用。 客户端应用程序与我使用node.js制作的API进行了交互,并且使用该API可以毫无问题地建立安全连接(通过node.js客户端发送我的SSL证书进行测试)。 但是,在使用react发送我的SSL证书而不是自签名证书时,我遇到了困难,这导致我使用chrome并尝试访问https://example.net:

  • 我是nodejs/expressjs新手。有人能解释一下如何通过https服务网页吗? 我得换个方式问这个问题,stackoverflow在抱怨我的帖子主要是代码? 以下是错误转储: 类型错误:对象 # 在对象处没有方法“get”。(/home/john/startup/docm/w2.js:21:5) at Module._compile (module.js:456:26) at Object

  • id=“root” XML的一个示例 代码 //SiebelMessage[@id='root'] 你知道我做错了什么吗?

  • 问题内容: 我认为这在Java中可能无法实现,因为注释及其参数在编译时已解决。我有一个如下的界面, 和另一类, 我用注解标记了许多类,我想知道是否可以避免在每个注解中指定字符串,而宁愿使用 但是,这会产生诸如注释值之类的编译错误,应该将其作为数组初始化程序等。有人知道我如何使用String常量或String []常量为注释提供值吗? 问题答案: 15.28。常数表达式 编译时常量表达式是表示原始类

  • 问题内容: 我想在localhost上的gunicorn下运行django项目。我安装并集成了Gunicorn。当我跑步时: 它可以工作,但是没有任何静态文件(css和js) 我在settings.py中禁用了debug和template_debug(将它们设置为false),但是仍然相同。我想念什么吗? 我称静态为: 问题答案: 在开发模式下以及使用其他服务器进行本地开发时,请将其添加到url.