我正在使用spring cloud stream和Aiven的模式注册表,后者使用confluent的模式注册表。Aiven的架构注册表由密码保护。根据这些说明,需要设置这两个配置参数才能成功访问架构注册表服务器。
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
当我只使用vanilla java的kafka驱动程序时,一切都很好,但如果我使用Spring cloud stream,我不知道如何注入这两个参数。目前,我正在应用程序的“spring.cloud.stream.kafka.binder.configuration”下放置“basic.auth.user.info”和“basic.auth.credentials.source”。yml文件。
这样做,我在模式想要注册的行中得到了“401未授权”
。
更新1:
根据'阿里n的建议,我更新了模式注册客户端bean的配置方式,以便它了解SSL上下文。
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
这有助于消除应用程序启动时的错误并注册架构。然而,每当应用程序想要向Kafka推送消息时,就会再次抛出一个新错误。最后,梅尔森的回答也解决了这一问题。
由于Aiven对Kafka安全协议使用SSL,因此需要使用证书进行身份验证。
您可以按照此页面了解其工作原理。简而言之,您需要运行以下命令来生成证书并导入它们:
openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks
然后您可以使用以下属性来使用证书:
spring.cloud.stream.kafka.streams.binder:
configuration:
security.protocol: SSL
ssl.truststore.location: client.truststore.jks
ssl.truststore.password: secret
ssl.keystore.type: PKCS12
ssl.keystore.location: client.keystore.p12
ssl.keystore.password: secret
ssl.key.password: secret
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
绑定器配置仅处理众所周知的消费者和生产者属性。
您可以在绑定级别设置任意属性。
spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
我遇到了与我所处的情况相同的问题,即连接到由aiven托管并由basic auth保护的安全模式注册表。为了使其正常工作,我必须配置以下属性:
spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password
我的活页夹的其他属性包括:
spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false
实际上发生的是,Spring的云流将增加Spring。Kafka。属性。将basic*添加到默认的KafkanConsumerFactory,并将配置添加到KafkanConsumer。在spring kafka初始化期间的某个时刻,将创建一个CachedSchemareRegistryClient,该客户端由这些属性提供。此客户端包含一个名为configureRestService的方法,该方法将检查属性映射是否包含“basic.auth.credentials.source”。当我们通过Spring提供这一点时。Kafka。属性它将找到此属性,并在访问架构注册表的endpoint时负责创建适当的头。
希望你也能这样做。
我使用的是Spring云版本格林威治。SR1、sping-boot-starter 2.1.4。RELEASE、avro-version 1.8.2和confluent.version5.2.1
我想使用keycloak作为身份验证和授权服务器来保护spring cloud数据流服务器并管理对其endpoint的访问。 我按照Spring的文档http://docs.spring.io/spring-cloud-dataflow/docs/1.7.0.rc1/reference/htmlsingle/#configuration-security-oauth2中的描述设置了scdf服务器的
我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化
我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表
提前,我为这个looong问题道歉!事实上,问题并没有那么长,但是我发布了很多我的代码片段,因为我真的不知道什么与解决我的问题相关或不相关...... 我一直在尝试使用以下内容制作一个简单的poc:-Angular 8前端-用于身份验证的Keycloak服务器-Spring云后端架构:-使用Spring Cloud Security保护的Spring Cloud Gateway-Spring Cl
如何使用Spring Boot保护REST服务(OAuth)? 我能得到的最接近的是:http://spring.io/guides/gs/securing-web/http://spring.io/guides/gs/authenticating-ldap/
我正在寻找一个最佳实践和高效的解决方案,以确保通过REST与Web客户端应用程序通信的多个微服务的安全。 当前设置: 这些微服务是用Java制作的,带有Spring框架,并运行在Docker容器中。 客户端是一个Angular 2应用程序。 我创建了一个新的µ服务,它将充当“网关”,是我的web客户端和其他服务之间的唯一通信点。 我从远程身份验证API检索JWT加密令牌(让我们称之为LOCK) 我