当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka-Streams无法为我的消费者和生产者配置SSL

骆雅昶
2023-03-14

我正在努力为Kafka-Streams正确配置Spring Cloud Stream,以便使用带有信任存储和密钥存储的SSL。

在我的应用程序中,我有多个流正在运行,所有流的SSL配置应该是相同的。

stream2:Topic2>Topic4 Topic3

stream3:Topic4>Topic5

我使用最新的Spring-Cloud Stream框架和Kafka-Streams以及Avro模型。我可以配置模式注册表。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
spring.application.name: processingapp
spring.cloud:
  function.definition: stream1;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
        stream2-in-0:
          destination: topic2
        stream2-in-1:
          destination: topic3
        stream2-out-0:
          destination: topic4
        stream3-in-0:
          destination: topic4
        stream3-out-0:
          destination: topic5
      kafka:
        binder:
          brokers: kafkabrokerurl.com:9092
          configuration: # not recognized at all
            security.protocol: SSL
            ssl.truststore.location: /mnt/truststore.jks
            ssl.truststore.type: JKS
            ssl.keystore.location: /mnt/keystore.jks
            ssl.keystore.type: JKS
            ssl.enabled.protocols: TLSv1.2
        bindings:
          default:
            consumer:
              resetOffsets: false
              startOffset: latest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream2-in-0:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream2-in-1:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
              materializedAs: sbinfo-outage-mapping-store
          stream2-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream3-in-0:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream3-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: https://schemaregistryurl.com # this works
 o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [kafkabrokerurl.com:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

我在这里发现了一个老问题,并尝试了这种方法,但结果是相同的:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129

          configuration: # not recognized at all
            "[security.protocol]": SSL
            "[ssl.truststore.location]": /mnt/truststore.jks
            "[ssl.truststore.type]": JKS
            "[ssl.keystore.location]": /mnt/keystore.jks
            "[ssl.keystore.type]": JKS
            "[ssl.enabled.protocols]": TLSv1.2

我了解到,当使用多个绑定器时,配置不起作用,所以我也尝试了定义绑定器名称的方法,但它抱怨说它不能识别它。

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
          binder: ssl
        stream1-out-0:
          destination: topic2
          binder: ssl
        stream2-in-0:
          destination: topic2
          binder: ssl
        stream2-in-1:
          destination: topic3
          binder: ssl
        stream2-out-0:
          destination: topic4
          binder: ssl
        stream3-in-0:
          destination: topic4
          binder: ssl
        stream3-out-0:
          destination: topic5
          binder: ssl
      binders:
        ssl:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers:
                      configuration:
                        security.protocol: SSL
                        ssl.truststore.location: /mnt/secrets/truststore.jks
                        ssl.truststore.type: JKS
                        ssl.keystore.location: /mnt/secrets/keystore.jks
                        ssl.keystore.type: JKS
                        ssl.enabled.protocols: TLSv1.2

错误:

2021-07-15 17:11:14.634 ERROR 5216 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream

共有1个答案

高高雅
2023-03-14

属性名称中缺少streams元素-您正在配置Kafka MessageChannel绑定器。

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              security:
                protocol: SSL
 类似资料:
  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的

  • 我想使用一个camel组件,它提供了使用和生成RESTful资源的能力。 对于这个例子,我想使用camel restlet组件。restlet组件一切正常,我已经使用REST DSL成功地实现了restlet consumer。然而,我有几个问题: 问题 1) 将restlet启用为异步是否安全?我读过restlet async可能会导致一些问题。这仍然正确吗?如何提高服务绩效?我应该改用码头吗?

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职