当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring云流访问使用密码保护的汇合模式注册服务器?

况嘉运
2023-03-14

我正在使用spring cloud stream和Aiven的模式注册表,后者使用confluent的模式注册表。Aiven的架构注册表由密码保护。根据这些说明,需要设置这两个配置参数才能成功访问架构注册表服务器。

 props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");

当我只使用vanilla java的kafka驱动程序时,一切都很好,但如果我使用Spring cloud stream,我不知道如何注入这两个参数。目前,我正在应用程序的“spring.cloud.stream.kafka.binder.configuration”下放置“basic.auth.user.info”和“basic.auth.credentials.source”。yml文件。

这样做,我在模式想要注册的行中得到了“401未授权”

更新1:

根据'阿里n的建议,我更新了模式注册客户端bean的配置方式,以便它了解SSL上下文。

@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

    SSLContext sslContext = SSLContextBuilder
        .create()
        .loadKeyMaterial(keyStore, "secret".toCharArray())
        .loadTrustMaterial(trustStore, acceptingTrustStrategy)
        .build();

    HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
    ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
        httpClient);
    ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
        new RestTemplate(requestFactory));
    schemaRegistryClient.setEndpoint(endpoint);
    return schemaRegistryClient;
  } catch (Exception ex) {
    ex.printStackTrace();
    return null;
  }
}

这有助于消除应用程序启动时的错误并注册架构。然而,每当应用程序想要向Kafka推送消息时,就会再次抛出一个新错误。最后,梅尔森的回答也解决了这一问题。

共有3个答案

爱刚捷
2023-03-14

由于Aiven对Kafka安全协议使用SSL,因此需要使用证书进行身份验证。

您可以按照此页面了解其工作原理。简而言之,您需要运行以下命令来生成证书并导入它们:

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

然后您可以使用以下属性来使用证书:

spring.cloud.stream.kafka.streams.binder:
  configuration:
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
颜新
2023-03-14

绑定器配置仅处理众所周知的消费者和生产者属性。

您可以在绑定级别设置任意属性。

spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
巢安澜
2023-03-14

我遇到了与我所处的情况相同的问题,即连接到由aiven托管并由basic auth保护的安全模式注册表。为了使其正常工作,我必须配置以下属性:

spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password

我的活页夹的其他属性包括:

spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false

实际上发生的是,Spring的云流将增加Spring。Kafka。属性。将basic*添加到默认的KafkanConsumerFactory,并将配置添加到KafkanConsumer。在spring kafka初始化期间的某个时刻,将创建一个CachedSchemareRegistryClient,该客户端由这些属性提供。此客户端包含一个名为configureRestService的方法,该方法将检查属性映射是否包含“basic.auth.credentials.source”。当我们通过Spring提供这一点时。Kafka。属性它将找到此属性,并在访问架构注册表的endpoint时负责创建适当的头。

希望你也能这样做。

我使用的是Spring云版本格林威治。SR1、sping-boot-starter 2.1.4。RELEASE、avro-version 1.8.2和confluent.version5.2.1

 类似资料:
  • 我想使用keycloak作为身份验证和授权服务器来保护spring cloud数据流服务器并管理对其endpoint的访问。 我按照Spring的文档http://docs.spring.io/spring-cloud-dataflow/docs/1.7.0.rc1/reference/htmlsingle/#configuration-security-oauth2中的描述设置了scdf服务器的

  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 提前,我为这个looong问题道歉!事实上,问题并没有那么长,但是我发布了很多我的代码片段,因为我真的不知道什么与解决我的问题相关或不相关...... 我一直在尝试使用以下内容制作一个简单的poc:-Angular 8前端-用于身份验证的Keycloak服务器-Spring云后端架构:-使用Spring Cloud Security保护的Spring Cloud Gateway-Spring Cl

  • 如何使用Spring Boot保护REST服务(OAuth)? 我能得到的最接近的是:http://spring.io/guides/gs/securing-web/http://spring.io/guides/gs/authenticating-ldap/

  • 我正在寻找一个最佳实践和高效的解决方案,以确保通过REST与Web客户端应用程序通信的多个微服务的安全。 当前设置: 这些微服务是用Java制作的,带有Spring框架,并运行在Docker容器中。 客户端是一个Angular 2应用程序。 我创建了一个新的µ服务,它将充当“网关”,是我的web客户端和其他服务之间的唯一通信点。 我从远程身份验证API检索JWT加密令牌(让我们称之为LOCK) 我