当前位置: 首页 > 知识库问答 >
问题:

无法使用ssl配置创建多个kafka绑定

祝锐
2023-03-14

我试图连接到一个Kafka集群通过SASL_SSL协议与jaas配置如下:

spring:
  cloud:
    stream:
      bindings:
        binding-1:
          binder: kafka-1-with-ssl
          destination: <destination-1>
          content-type: text/plain
          group: <group-id-1>
          consumer:
            header-mode: headers
        binding-2:
          binder: kafka-2-with-ssl
          destination: <destination-2>
          content-type: text/plain
          group: <group-id-2>
          consumer:
            header-mode: headers
            

      binders:
        kafka-1-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-1>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-1>
                            password: <ts-password-1>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-1>
                          password: <password-1>

        kafka-2-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-2>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-2>
                            password: <ts-password-2>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-2>
                          password: <password-2>
      kafka:
        binder:
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256 

上面的配置与Spring-Cloud-stream官方git repo上提供的示例配置是内联的。

在库的git repo上提出的类似问题说,它在最新版本中已经修复,但似乎没有。得到以下错误:

SpringBootVersion: 2.2.8和Spring-Cloud-stream-依赖版本-Horsham。SR6.

Failed to create consumer binding; retrying in 30 seconds | org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
    at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:65)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:246)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:216)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:183)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:79)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:402)
    ... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
    ... 18 common frames omitted
Caused by: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:804)
    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
    ... 22 common frames omitted
Caused by: sun.security.krb5.RealmException: KrbException: Cannot locate default realm
    at sun.security.krb5.Realm.getDefault(Realm.java:68)
    at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:462)
    at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:471)
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:706)
    ... 38 common frames omitted
Caused by: sun.security.krb5.KrbException: Cannot locate default realm
    at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
    at sun.security.krb5.Realm.getDefault(Realm.java:64)
    ... 41 common frames omitted 

这让我觉得库没有正确地获取配置道具,因为jaas。loginModule被指定为ScramLoginModule,但它使用Krb5LoginModule进行身份验证。

但是,令人惊讶的是,当按照以下方式进行配置时(区别在于最后一部分的ssl凭据在binder的环境之外),它会连接到全局ssl props(在binder的环境之外)中指定的binder,并默默地忽略另一个binder,而不显示任何错误日志。

假设在全局ssl道具中指定了绑定器kafka-2-with-ssl的密码凭据,则创建该绑定器,订阅该绑定器的绑定开始使用事件。但这只有在我们需要创建单个活页夹时才有用。

spring:
  cloud:
    stream:
      bindings:
        binding-1:
          binder: kafka-1-with-ssl
          destination: <destination-1>
          content-type: text/plain
          group: <group-id-1>
          consumer:
            header-mode: headers
        binding-2:
          binder: kafka-2-with-ssl
          destination: <destination-2>
          content-type: text/plain
          group: <group-id-2>
          consumer:
            header-mode: headers
            

      binders:
        kafka-1-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-1>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-1>
                            password: <ts-password-1>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-1>
                          password: <password-1>

        kafka-2-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-2>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-2>
                            password: <ts-password-2>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-2>
                          password: <password-2>
      kafka:
        binder:
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256 
            ssl:
              truststore:
                location: <location-2>
                password: <ts-password-2> 
                type: JKS
          jaas:
            loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
             options:
               username: <username-2>
               password: <password-2> 

请确保ssl凭据没有问题。使用ssl kafka活页夹中的任何一个进行了认真测试,并成功地单独创建了活页夹。其目的是使用SASL_SSL协议连接到多个Kafka绑定器。提前谢谢。

共有1个答案

潘高洁
2023-03-14

我认为您可能希望遵循KIP-85中针对该问题实施的解决方案。而不是使用Spring Cloud Stream Kafka binder提供的JAAS配置或设置java。安全啊。登录。属性,使用sasl。贾斯。config属性,该属性优先于其他方法。通过使用sasl。贾斯。config,您可以覆盖JVM对使用JVM范围的静态安全上下文的限制,从而忽略在第一个JAAS配置之后发现的任何后续JAAS配置。

下面是一个示例应用程序,演示了如何作为多绑定应用程序连接到具有不同安全上下文的多个Kafka群集。

 类似资料:
  • 我正在尝试为Kafka Connect REST API(2.11-2.1.0)配置SSL。 问题所在 我尝试了两种配置(工人配置): 带有前缀 < li >并且不带< code>listeners.https.前缀 两种配置都启动正常,并在尝试连接到https://localhost:9000时显示以下异常: 在日志中,我看到SslContextWorks是使用任何密钥库创建的,但使用密码: 因

  • 我试图用SSL (TLS)在节点间以及节点和客户端之间配置Kafka节点,但是遇到了配置问题。Kafka版本是2.3.0。我的相关设置是: 仅供参考,为了简单起见,我从实例化Kafka容器的docker-compose文件中复制了设置。env vars将1:1映射到server.properties.中的属性。在容器启动期间,这些设置将应用于server.properties文件。 当我开始使用此

  • 我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat

  • 我正在尝试用https://github.com/php-amqplib/rabbitMQBundle和Symfony2框架实现RabbitMQ。 我已经设法使这个东西在一个生产者和一个消费者的情况下工作,但问题是当我使用多个消费者的时候。 这是我的配置: [Symfony\Component\DependencyInjection\Exception\ServiceNotFoundExcepti

  • 一个基本的汇流-Kafka生产者和消费者已经被创建来发送明文消息。 服务器配置: 实现SSL是否需要其他配置? 另外,有人能总结一下caroot吗?据我所知,这是一个证书链。因此,如果只有一个证书,那么caroot.pem和cert.pem应该是相同的吗?这个文件可能是问题所在。证书和私钥是在PyOpenSSL中创建的。keystore.p12是使用KeyTool从.jks密钥库转换而来的。 谢谢

  • 使用SSL连接到Postgres时引发异常。 原因:javax.net.ssl.SSLException:收到致命警报:在sun . security . SSL . alerts . getsslexception(alerts . Java:208)在sun . security . SSL . alerts . getsslexception(alerts . Java:154)在sun .