我的要求是通过SSL与Spring Boot和Apache Camel连接Kafka主题,为此,我编写了以下代码,但我面临一个类似于sun.security.validator.validatoreXception引起的错误:PKIX路径构建失败:sun.security.provider.certPath.SunCertPathBuilderException:无法找到请求目标的有效认证路径
任何人,请帮助我如何解决这个错误。
//in this code i'm configured the SSL
@Configuration
public class Testing {
@Bean
SSLContextParameters sslContextParameters(){
KeyStoreParameters store = new KeyStoreParameters();
store.setResource("kafka.client.truststore.jks");
store.setPassword("123456");
TrustManagersParameters trust = new TrustManagersParameters();
trust.setKeyStore(store);
SSLContextParameters parameters = new SSLContextParameters();
parameters.setTrustManagers(trust);
return parameters;
}
}
在下面的文件中,我使用sslContextParameters参数调用router
@Autowired
SSLContextParameters params;
@Override
public void configure() throws Exception {
from("{{timerOnce}}").process(consumerCreate).to(
"https://xx.xx.xx.xxx/consumers/group-id?sslContextParameters=params");
}
******我使用了另一种通过SSL连接Kafka集群的方法,但不幸的是,它得到了如下的异常*******org.apache.camel.spring.boot.camelspringbootInitializationException:java.io.ioException:无效的密钥存储格式
public Endpoint setupSSLConext(CamelContext camelContext) throws Exception {
KeyStoreParameters keyStoreParameters = new KeyStoreParameters();
// Change this path to point to your truststore/keystore as jks files
keyStoreParameters.setResource("kafka.client.truststore.jks");
keyStoreParameters.setPassword("123456");
KeyManagersParameters keyManagersParameters = new KeyManagersParameters();
keyManagersParameters.setKeyStore(keyStoreParameters);
keyManagersParameters.setKeyPassword("123456");
TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
trustManagersParameters.setKeyStore(keyStoreParameters);
SSLContextParameters sslContextParameters = new SSLContextParameters();
sslContextParameters.setKeyManagers(keyManagersParameters);
sslContextParameters.setTrustManagers(trustManagersParameters);
HttpComponent httpComponent = camelContext.getComponent("https4", HttpComponent.class);
httpComponent.setSslContextParameters(sslContextParameters);
// This is important to make your cert skip CN/Hostname checks
httpComponent.setX509HostnameVerifier(new X509HostnameVerifier() {
@Override
public void verify(String s, SSLSocket sslSocket) throws IOException {
}
@Override
public void verify(String s, X509Certificate x509Certificate) throws SSLException {
}
@Override
public void verify(String s, String[] strings, String[] strings1) throws SSLException {
}
@Override
public boolean verify(String s, SSLSession sslSession) {
// I don't mind just return true for all or you can add your own logic
return true;
}
});
return httpComponent.createEndpoint("https://XX.XX.X.XXX/consumers/");
}
public void configure() throws Exception {
Endpoint createEndpoint = cdcHelper.setupSSLConext(context);
from("{{timerOnce}}").process(consumerCreate)
.to(createEndpoint); // calling kafka consumer
}
}
您可以按照下面的方法使用Apache Camel和Springboot设置Kafka使用者。
将以下属性添加到应用程序中。properties
# kafka configuration
kafka.topic=iot1
kafka.camelKafkaOptions.groupId=grp1
kafka.camelKafkaOptions.brokers=kafka.localtest:9093
kafka.camelKafkaOptions.consumersCount=10
kafka.camelKafkaOptions.autoOffsetReset=latest
kafka.camelKafkaOptions.autoCommitEnable=false
kafka.camelKafkaOptions.allowManualCommit=true
kafka.camelKafkaOptions.metadataMaxAgeMs=5000
kafka.camelKafkaOptions.securityProtocol=SSL
kafka.camelKafkaOptions.sslEndpointAlgorithm=HTTPS
kafka.camelKafkaOptions..sslKeyPassword=<ssl key password>
kafka.camelKafkaOptions..sslKeystoreLocation=<keystorepath>
kafka.camelKafkaOptions.sslKeystorePassword=<sslkeystore password>
kafka.camelKafkaOptions.sslTruststoreLocation=<truststore path>
kafka.camelKafkaOptions.sslTruststorePassword=<password>
and create a utility method, to construct a kafka url
@Component
public class KafkaUtility {
public String getKafkaEndpoint(String topicName ){
StringBuilder urlBuilder = new StringBuilder("kafka:" + topicName);
if (!getCamelKafkaOptions().isEmpty()) {
urlBuilder.append("&");
getCamelKafkaOptions().forEach(
(key, value) -> {
if (StringUtils.isNotBlank(value)) {
appendConfig(urlBuilder, key, value);
}
}
);
}
// strip the last "&" symbol
String kafkaURL = stripLastAnd(urlBuilder.toString());
return kafkaURL;
}
}
In your route builder, implement the below
@Autowired
private KafkaUtility kafkaUtility;
from(kafkaUtility.getKafkaEndpoint())
.process("yourprocessor")
.to("tourl");
当我使用带SSL的apache camel连接到Kafka集群时,我面临以下问题,请任何人帮助解决这个问题 javax . net . SSL . SSL handshake异常:sun . security . validator . validator异常:PKIX路径构建失败:sun . security . provider . certpath . suncertpathbuildere
我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息 1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html) 2)将文件解压缩到/home/ec2-user/plugins/
代码片段如下所示: 如果有人有决议,请帮忙?
我正在尝试在启用SSL的Kafka集群中注册MySql Debezium连接器。我为此目的使用的卷曲是: Debezium无法创建数据库。历史记录主题,它将失败,并出现以下错误: 美化错误:
我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?
我目前使用的是Kafka connect集群,它有两个节点,使用的是同一个 当使用curl/connectors时,我可以获得创建的连接器列表,但我看不到有关活动节点的信息,健康检查。。。